expected to be reduced in upcoming Python release. We are aiming for an
overhead of 10% or less on the pyperformance suite compared to the default
GIL-enabled build.
+
+
+Behavioral changes
+==================
+
+This section describes CPython behavioural changes with the free-threaded
+build.
+
+
+Context variables
+-----------------
+
+In the free-threaded build, the flag :data:`~sys.flags.thread_inherit_context`
+is set to true by default which causes threads created with
+:class:`threading.Thread` to start with a copy of the
+:class:`~contextvars.Context()` of the caller of
+:meth:`~threading.Thread.start`. In the default GIL-enabled build, the flag
+defaults to false so threads start with an
+empty :class:`~contextvars.Context()`.
+
+
+Warning filters
+---------------
+
+In the free-threaded build, the flag :data:`~sys.flags.context_aware_warnings`
+is set to true by default. In the default GIL-enabled build, the flag defaults
+to false. If the flag is true then the :class:`warnings.catch_warnings`
+context manager uses a context variable for warning filters. If the flag is
+false then :class:`~warnings.catch_warnings` modifies the global filters list,
+which is not thread-safe. See the :mod:`warnings` module for more details.
If :func:`setcontext` has not been called before :func:`getcontext`, then
:func:`getcontext` will automatically create a new context for use in the
-current thread.
-
-The new context is copied from a prototype context called *DefaultContext*. To
-control the defaults so that each thread will use the same values throughout the
-application, directly modify the *DefaultContext* object. This should be done
-*before* any threads are started so that there won't be a race condition between
-threads calling :func:`getcontext`. For example::
+current thread. New context objects have default values set from the
+:data:`decimal.DefaultContext` object.
+
+The :data:`sys.flags.thread_inherit_context` flag affects the context for
+new threads. If the flag is false, new threads will start with an empty
+context. In this case, :func:`getcontext` will create a new context object
+when called and use the default values from *DefaultContext*. If the flag
+is true, new threads will start with a copy of context from the caller of
+:meth:`threading.Thread.start`.
+
+To control the defaults so that each thread will use the same values throughout
+the application, directly modify the *DefaultContext* object. This should be
+done *before* any threads are started so that there won't be a race condition
+between threads calling :func:`getcontext`. For example::
# Set applicationwide defaults for all threads about to be launched
DefaultContext.prec = 12
.. data:: flags
The :term:`named tuple` *flags* exposes the status of command line
- flags. The attributes are read only.
+ flags. Flags should only be accessed only by name and not by index. The
+ attributes are read only.
.. list-table::
* - .. attribute:: flags.warn_default_encoding
- :option:`-X warn_default_encoding <-X>`
+ * - .. attribute:: flags.gil
+ - :option:`-X gil <-X>` and :envvar:`PYTHON_GIL`
+
+ * - .. attribute:: flags.thread_inherit_context
+ - :option:`-X thread_inherit_context <-X>` and
+ :envvar:`PYTHON_THREAD_INHERIT_CONTEXT`
+
+ * - .. attribute:: flags.context_aware_warnings
+ - :option:`-X thread_inherit_context <-X>` and
+ :envvar:`PYTHON_CONTEXT_AWARE_WARNINGS`
+
+
.. versionchanged:: 3.2
Added ``quiet`` attribute for the new :option:`-q` flag.
.. versionchanged:: 3.11
Added the ``int_max_str_digits`` attribute.
+ .. versionchanged:: 3.13
+ Added the ``gil`` attribute.
+
+ .. versionchanged:: 3.14
+ Added the ``thread_inherit_context`` attribute.
+
+ .. versionchanged:: 3.14
+ Added the ``context_aware_warnings`` attribute.
+
.. data:: float_info
.. class:: Thread(group=None, target=None, name=None, args=(), kwargs={}, *, \
- daemon=None)
+ daemon=None, context=None)
This constructor should always be called with keyword arguments. Arguments
are:
If ``None`` (the default), the daemonic property is inherited from the
current thread.
+ *context* is the :class:`~contextvars.Context` value to use when starting
+ the thread. The default value is ``None`` which indicates that the
+ :data:`sys.flags.thread_inherit_context` flag controls the behaviour. If
+ the flag is true, threads will start with a copy of the context of the
+ caller of :meth:`~Thread.start`. If false, they will start with an empty
+ context. To explicitly start with an empty context, pass a new instance of
+ :class:`~contextvars.Context()`. To explicitly start with a copy of the
+ current context, pass the value from :func:`~contextvars.copy_context`. The
+ flag defaults true on free-threaded builds and false otherwise.
+
If the subclass overrides the constructor, it must make sure to invoke the
base class constructor (``Thread.__init__()``) before doing anything else to
the thread.
.. versionchanged:: 3.10
Use the *target* name if *name* argument is omitted.
+ .. versionchanged:: 3.14
+ Added the *context* parameter.
+
.. method:: start()
Start the thread's activity.
While within the context manager all warnings will simply be ignored. This
allows you to use known-deprecated code without having to see the warning while
not suppressing the warning for other code that might not be aware of its use
-of deprecated code. Note: this can only be guaranteed in a single-threaded
-application. If two or more threads use the :class:`catch_warnings` context
-manager at the same time, the behavior is undefined.
+of deprecated code.
+ .. note::
+ See :ref:`warning-concurrent-safe` for details on the
+ concurrency-safety of the :class:`catch_warnings` context manager when
+ used in programs using multiple threads or async functions.
.. _warning-testing:
Once the context manager exits, the warnings filter is restored to its state
when the context was entered. This prevents tests from changing the warnings
filter in unexpected ways between tests and leading to indeterminate test
-results. The :func:`showwarning` function in the module is also restored to
-its original value. Note: this can only be guaranteed in a single-threaded
-application. If two or more threads use the :class:`catch_warnings` context
-manager at the same time, the behavior is undefined.
+results.
+
+ .. note::
+
+ See :ref:`warning-concurrent-safe` for details on the
+ concurrency-safety of the :class:`catch_warnings` context manager when
+ used in programs using multiple threads or async functions.
When testing multiple operations that raise the same kind of warning, it
is important to test them in a manner that confirms each operation is raising
.. note::
- The :class:`catch_warnings` manager works by replacing and
- then later restoring the module's
- :func:`showwarning` function and internal list of filter
- specifications. This means the context manager is modifying
- global state and therefore is not thread-safe.
+ See :ref:`warning-concurrent-safe` for details on the
+ concurrency-safety of the :class:`catch_warnings` context manager when
+ used in programs using multiple threads or async functions.
+
.. versionchanged:: 3.11
Added the *action*, *category*, *lineno*, and *append* parameters.
+
+
+.. _warning-concurrent-safe:
+
+Concurrent safety of Context Managers
+-------------------------------------
+
+The behavior of :class:`catch_warnings` context manager depends on the
+:data:`sys.flags.context_aware_warnings` flag. If the flag is true, the
+context manager behaves in a concurrent-safe fashion and otherwise not.
+Concurrent-safe means that it is both thread-safe and safe to use within
+:ref:`asyncio coroutines <coroutine>` and tasks. Being thread-safe means
+that behavior is predictable in a multi-threaded program. The flag defaults
+to true for free-threaded builds and false otherwise.
+
+If the :data:`~sys.flags.context_aware_warnings` flag is false, then
+:class:`catch_warnings` will modify the global attributes of the
+:mod:`warnings` module. This is not safe if used within a concurrent program
+(using multiple threads or using asyncio coroutines). For example, if two
+or more threads use the :class:`catch_warnings` class at the same time, the
+behavior is undefined.
+
+If the flag is true, :class:`catch_warnings` will not modify global
+attributes and will instead use a :class:`~contextvars.ContextVar` to
+store the newly established warning filtering state. A context variable
+provides thread-local storage and it makes the use of :class:`catch_warnings`
+thread-safe.
+
+The *record* parameter of the context handler also behaves differently
+depending on the value of the flag. When *record* is true and the flag is
+false, the context manager works by replacing and then later restoring the
+module's :func:`showwarning` function. That is not concurrent-safe.
+
+When *record* is true and the flag is true, the :func:`showwarning` function
+is not replaced. Instead, the recording status is indicated by an internal
+property in the context variable. In this case, the :func:`showwarning`
+function will not be restored when exiting the context handler.
+
+The :data:`~sys.flags.context_aware_warnings` flag can be set the :option:`-X
+context_aware_warnings<-X>` command-line option or by the
+:envvar:`PYTHON_CONTEXT_AWARE_WARNINGS` environment variable.
+
+ .. note::
+
+ It is likely that most programs that desire thread-safe
+ behaviour of the warnings module will also want to set the
+ :data:`~sys.flags.thread_inherit_context` flag to true. That flag
+ causes threads created by :class:`threading.Thread` to start
+ with a copy of the context variables from the thread starting
+ it. When true, the context established by :class:`catch_warnings`
+ in one thread will also apply to new threads started by it. If false,
+ new threads will start with an empty warnings context variable,
+ meaning that any filtering that was established by a
+ :class:`catch_warnings` context manager will no longer be active.
+
+.. versionchanged:: 3.14
+
+ Added the :data:`sys.flags.context_aware_warnings` flag and the use of a
+ context variable for :class:`catch_warnings` if the flag is true. Previous
+ versions of Python acted as if the flag was always set to false.
.. versionadded:: 3.13
+ * :samp:`-X thread_inherit_context={0,1}` causes :class:`~threading.Thread`
+ to, by default, use a copy of context of of the caller of
+ ``Thread.start()`` when starting. Otherwise, threads will start
+ with an empty context. If unset, the value of this option defaults
+ to ``1`` on free-threaded builds and to ``0`` otherwise. See also
+ :envvar:`PYTHON_THREAD_INHERIT_CONTEXT`.
+
+ .. versionadded:: 3.14
+
+ * :samp:`-X context_aware_warnings={0,1}` causes the
+ :class:`warnings.catch_warnings` context manager to use a
+ :class:`~contextvars.ContextVar` to store warnings filter state. If
+ unset, the value of this option defaults to ``1`` on free-threaded builds
+ and to ``0`` otherwise. See also :envvar:`PYTHON_CONTEXT_AWARE_WARNINGS`.
+
+ .. versionadded:: 3.14
+
It also allows passing arbitrary values and retrieving them through the
:data:`sys._xoptions` dictionary.
.. versionadded:: 3.13
+.. envvar:: PYTHON_THREAD_INHERIT_CONTEXT
+
+ If this variable is set to ``1`` then :class:`~threading.Thread` will,
+ by default, use a copy of context of of the caller of ``Thread.start()``
+ when starting. Otherwise, new threads will start with an empty context.
+ If unset, this variable defaults to ``1`` on free-threaded builds and to
+ ``0`` otherwise. See also :option:`-X thread_inherit_context<-X>`.
+
+ .. versionadded:: 3.14
+
+.. envvar:: PYTHON_CONTEXT_AWARE_WARNINGS
+
+ If set to ``1`` then the :class:`warnings.catch_warnings` context
+ manager will use a :class:`~contextvars.ContextVar` to store warnings
+ filter state. If unset, this variable defaults to ``1`` on
+ free-threaded builds and to ``0`` otherwise. See :option:`-X
+ context_aware_warnings<-X>`.
+
+ .. versionadded:: 3.14
+
Debug-mode variables
~~~~~~~~~~~~~~~~~~~~
int use_frozen_modules;
int safe_path;
int int_max_str_digits;
+ int thread_inherit_context;
+ int context_aware_warnings;
#ifdef __APPLE__
int use_system_logger;
#endif
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_feature_version));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_field_types));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_fields_));
+ _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_filters));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_finalizing));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_find_and_load));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_fix_up_module));
STRUCT_FOR_ID(_feature_version)
STRUCT_FOR_ID(_field_types)
STRUCT_FOR_ID(_fields_)
+ STRUCT_FOR_ID(_filters)
STRUCT_FOR_ID(_finalizing)
STRUCT_FOR_ID(_find_and_load)
STRUCT_FOR_ID(_fix_up_module)
PyObject *default_action; /* String */
_PyRecursiveMutex lock;
long filters_version;
+ PyObject *context;
};
struct _Py_mem_interp_free_queue {
INIT_ID(_feature_version), \
INIT_ID(_field_types), \
INIT_ID(_fields_), \
+ INIT_ID(_filters), \
INIT_ID(_finalizing), \
INIT_ID(_find_and_load), \
INIT_ID(_fix_up_module), \
_PyUnicode_InternStatic(interp, &string);
assert(_PyUnicode_CheckConsistency(string, 1));
assert(PyUnicode_GET_LENGTH(string) != 1);
+ string = &_Py_ID(_filters);
+ _PyUnicode_InternStatic(interp, &string);
+ assert(_PyUnicode_CheckConsistency(string, 1));
+ assert(PyUnicode_GET_LENGTH(string) != 1);
string = &_Py_ID(_finalizing);
_PyUnicode_InternStatic(interp, &string);
assert(_PyUnicode_CheckConsistency(string, 1));
--- /dev/null
+"""Python part of the warnings subsystem."""
+
+import sys
+import _contextvars
+import _thread
+
+
+__all__ = ["warn", "warn_explicit", "showwarning",
+ "formatwarning", "filterwarnings", "simplefilter",
+ "resetwarnings", "catch_warnings", "deprecated"]
+
+
+# Normally '_wm' is sys.modules['warnings'] but for unit tests it can be
+# a different module. User code is allowed to reassign global attributes
+# of the 'warnings' module, commonly 'filters' or 'showwarning'. So we
+# need to lookup these global attributes dynamically on the '_wm' object,
+# rather than binding them earlier. The code in this module consistently uses
+# '_wm.<something>' rather than using the globals of this module. If the
+# '_warnings' C extension is in use, some globals are replaced by functions
+# and variables defined in that extension.
+_wm = None
+
+
+def _set_module(module):
+ global _wm
+ _wm = module
+
+
+# filters contains a sequence of filter 5-tuples
+# The components of the 5-tuple are:
+# - an action: error, ignore, always, all, default, module, or once
+# - a compiled regex that must match the warning message
+# - a class representing the warning category
+# - a compiled regex that must match the module that is being warned
+# - a line number for the line being warning, or 0 to mean any line
+# If either if the compiled regexs are None, match anything.
+filters = []
+
+
+defaultaction = "default"
+onceregistry = {}
+_lock = _thread.RLock()
+_filters_version = 1
+
+
+# If true, catch_warnings() will use a context var to hold the modified
+# filters list. Otherwise, catch_warnings() will operate on the 'filters'
+# global of the warnings module.
+_use_context = sys.flags.context_aware_warnings
+
+
+class _Context:
+ def __init__(self, filters):
+ self._filters = filters
+ self.log = None # if set to a list, logging is enabled
+
+ def copy(self):
+ context = _Context(self._filters[:])
+ if self.log is not None:
+ context.log = self.log
+ return context
+
+ def _record_warning(self, msg):
+ self.log.append(msg)
+
+
+class _GlobalContext(_Context):
+ def __init__(self):
+ self.log = None
+
+ @property
+ def _filters(self):
+ # Since there is quite a lot of code that assigns to
+ # warnings.filters, this needs to return the current value of
+ # the module global.
+ try:
+ return _wm.filters
+ except AttributeError:
+ # 'filters' global was deleted. Do we need to actually handle this case?
+ return []
+
+
+_global_context = _GlobalContext()
+
+
+_warnings_context = _contextvars.ContextVar('warnings_context')
+
+
+def _get_context():
+ if not _use_context:
+ return _global_context
+ try:
+ return _wm._warnings_context.get()
+ except LookupError:
+ return _global_context
+
+
+def _set_context(context):
+ assert _use_context
+ _wm._warnings_context.set(context)
+
+
+def _new_context():
+ assert _use_context
+ old_context = _wm._get_context()
+ new_context = old_context.copy()
+ _wm._set_context(new_context)
+ return old_context, new_context
+
+
+def _get_filters():
+ """Return the current list of filters. This is a non-public API used by
+ module functions and by the unit tests."""
+ return _wm._get_context()._filters
+
+
+def _filters_mutated_lock_held():
+ _wm._filters_version += 1
+
+
+def showwarning(message, category, filename, lineno, file=None, line=None):
+ """Hook to write a warning to a file; replace if you like."""
+ msg = _wm.WarningMessage(message, category, filename, lineno, file, line)
+ _wm._showwarnmsg_impl(msg)
+
+
+def formatwarning(message, category, filename, lineno, line=None):
+ """Function to format a warning the standard way."""
+ msg = _wm.WarningMessage(message, category, filename, lineno, None, line)
+ return _wm._formatwarnmsg_impl(msg)
+
+
+def _showwarnmsg_impl(msg):
+ context = _wm._get_context()
+ if context.log is not None:
+ context._record_warning(msg)
+ return
+ file = msg.file
+ if file is None:
+ file = sys.stderr
+ if file is None:
+ # sys.stderr is None when run with pythonw.exe:
+ # warnings get lost
+ return
+ text = _wm._formatwarnmsg(msg)
+ try:
+ file.write(text)
+ except OSError:
+ # the file (probably stderr) is invalid - this warning gets lost.
+ pass
+
+
+def _formatwarnmsg_impl(msg):
+ category = msg.category.__name__
+ s = f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n"
+
+ if msg.line is None:
+ try:
+ import linecache
+ line = linecache.getline(msg.filename, msg.lineno)
+ except Exception:
+ # When a warning is logged during Python shutdown, linecache
+ # and the import machinery don't work anymore
+ line = None
+ linecache = None
+ else:
+ line = msg.line
+ if line:
+ line = line.strip()
+ s += " %s\n" % line
+
+ if msg.source is not None:
+ try:
+ import tracemalloc
+ # Logging a warning should not raise a new exception:
+ # catch Exception, not only ImportError and RecursionError.
+ except Exception:
+ # don't suggest to enable tracemalloc if it's not available
+ suggest_tracemalloc = False
+ tb = None
+ else:
+ try:
+ suggest_tracemalloc = not tracemalloc.is_tracing()
+ tb = tracemalloc.get_object_traceback(msg.source)
+ except Exception:
+ # When a warning is logged during Python shutdown, tracemalloc
+ # and the import machinery don't work anymore
+ suggest_tracemalloc = False
+ tb = None
+
+ if tb is not None:
+ s += 'Object allocated at (most recent call last):\n'
+ for frame in tb:
+ s += (' File "%s", lineno %s\n'
+ % (frame.filename, frame.lineno))
+
+ try:
+ if linecache is not None:
+ line = linecache.getline(frame.filename, frame.lineno)
+ else:
+ line = None
+ except Exception:
+ line = None
+ if line:
+ line = line.strip()
+ s += ' %s\n' % line
+ elif suggest_tracemalloc:
+ s += (f'{category}: Enable tracemalloc to get the object '
+ f'allocation traceback\n')
+ return s
+
+
+# Keep a reference to check if the function was replaced
+_showwarning_orig = showwarning
+
+
+def _showwarnmsg(msg):
+ """Hook to write a warning to a file; replace if you like."""
+ try:
+ sw = _wm.showwarning
+ except AttributeError:
+ pass
+ else:
+ if sw is not _showwarning_orig:
+ # warnings.showwarning() was replaced
+ if not callable(sw):
+ raise TypeError("warnings.showwarning() must be set to a "
+ "function or method")
+
+ sw(msg.message, msg.category, msg.filename, msg.lineno,
+ msg.file, msg.line)
+ return
+ _wm._showwarnmsg_impl(msg)
+
+
+# Keep a reference to check if the function was replaced
+_formatwarning_orig = formatwarning
+
+
+def _formatwarnmsg(msg):
+ """Function to format a warning the standard way."""
+ try:
+ fw = _wm.formatwarning
+ except AttributeError:
+ pass
+ else:
+ if fw is not _formatwarning_orig:
+ # warnings.formatwarning() was replaced
+ return fw(msg.message, msg.category,
+ msg.filename, msg.lineno, msg.line)
+ return _wm._formatwarnmsg_impl(msg)
+
+
+def filterwarnings(action, message="", category=Warning, module="", lineno=0,
+ append=False):
+ """Insert an entry into the list of warnings filters (at the front).
+
+ 'action' -- one of "error", "ignore", "always", "all", "default", "module",
+ or "once"
+ 'message' -- a regex that the warning message must match
+ 'category' -- a class that the warning must be a subclass of
+ 'module' -- a regex that the module name must match
+ 'lineno' -- an integer line number, 0 matches all warnings
+ 'append' -- if true, append to the list of filters
+ """
+ if action not in {"error", "ignore", "always", "all", "default", "module", "once"}:
+ raise ValueError(f"invalid action: {action!r}")
+ if not isinstance(message, str):
+ raise TypeError("message must be a string")
+ if not isinstance(category, type) or not issubclass(category, Warning):
+ raise TypeError("category must be a Warning subclass")
+ if not isinstance(module, str):
+ raise TypeError("module must be a string")
+ if not isinstance(lineno, int):
+ raise TypeError("lineno must be an int")
+ if lineno < 0:
+ raise ValueError("lineno must be an int >= 0")
+
+ if message or module:
+ import re
+
+ if message:
+ message = re.compile(message, re.I)
+ else:
+ message = None
+ if module:
+ module = re.compile(module)
+ else:
+ module = None
+
+ _wm._add_filter(action, message, category, module, lineno, append=append)
+
+
+def simplefilter(action, category=Warning, lineno=0, append=False):
+ """Insert a simple entry into the list of warnings filters (at the front).
+
+ A simple filter matches all modules and messages.
+ 'action' -- one of "error", "ignore", "always", "all", "default", "module",
+ or "once"
+ 'category' -- a class that the warning must be a subclass of
+ 'lineno' -- an integer line number, 0 matches all warnings
+ 'append' -- if true, append to the list of filters
+ """
+ if action not in {"error", "ignore", "always", "all", "default", "module", "once"}:
+ raise ValueError(f"invalid action: {action!r}")
+ if not isinstance(lineno, int):
+ raise TypeError("lineno must be an int")
+ if lineno < 0:
+ raise ValueError("lineno must be an int >= 0")
+ _wm._add_filter(action, None, category, None, lineno, append=append)
+
+
+def _filters_mutated():
+ # Even though this function is not part of the public API, it's used by
+ # a fair amount of user code.
+ with _wm._lock:
+ _wm._filters_mutated_lock_held()
+
+
+def _add_filter(*item, append):
+ with _wm._lock:
+ filters = _wm._get_filters()
+ if not append:
+ # Remove possible duplicate filters, so new one will be placed
+ # in correct place. If append=True and duplicate exists, do nothing.
+ try:
+ filters.remove(item)
+ except ValueError:
+ pass
+ filters.insert(0, item)
+ else:
+ if item not in filters:
+ filters.append(item)
+ _wm._filters_mutated_lock_held()
+
+
+def resetwarnings():
+ """Clear the list of warning filters, so that no filters are active."""
+ with _wm._lock:
+ del _wm._get_filters()[:]
+ _wm._filters_mutated_lock_held()
+
+
+class _OptionError(Exception):
+ """Exception used by option processing helpers."""
+ pass
+
+
+# Helper to process -W options passed via sys.warnoptions
+def _processoptions(args):
+ for arg in args:
+ try:
+ _wm._setoption(arg)
+ except _wm._OptionError as msg:
+ print("Invalid -W option ignored:", msg, file=sys.stderr)
+
+
+# Helper for _processoptions()
+def _setoption(arg):
+ parts = arg.split(':')
+ if len(parts) > 5:
+ raise _wm._OptionError("too many fields (max 5): %r" % (arg,))
+ while len(parts) < 5:
+ parts.append('')
+ action, message, category, module, lineno = [s.strip()
+ for s in parts]
+ action = _wm._getaction(action)
+ category = _wm._getcategory(category)
+ if message or module:
+ import re
+ if message:
+ message = re.escape(message)
+ if module:
+ module = re.escape(module) + r'\Z'
+ if lineno:
+ try:
+ lineno = int(lineno)
+ if lineno < 0:
+ raise ValueError
+ except (ValueError, OverflowError):
+ raise _wm._OptionError("invalid lineno %r" % (lineno,)) from None
+ else:
+ lineno = 0
+ _wm.filterwarnings(action, message, category, module, lineno)
+
+
+# Helper for _setoption()
+def _getaction(action):
+ if not action:
+ return "default"
+ for a in ('default', 'always', 'all', 'ignore', 'module', 'once', 'error'):
+ if a.startswith(action):
+ return a
+ raise _wm._OptionError("invalid action: %r" % (action,))
+
+
+# Helper for _setoption()
+def _getcategory(category):
+ if not category:
+ return Warning
+ if '.' not in category:
+ import builtins as m
+ klass = category
+ else:
+ module, _, klass = category.rpartition('.')
+ try:
+ m = __import__(module, None, None, [klass])
+ except ImportError:
+ raise _wm._OptionError("invalid module name: %r" % (module,)) from None
+ try:
+ cat = getattr(m, klass)
+ except AttributeError:
+ raise _wm._OptionError("unknown warning category: %r" % (category,)) from None
+ if not issubclass(cat, Warning):
+ raise _wm._OptionError("invalid warning category: %r" % (category,))
+ return cat
+
+
+def _is_internal_filename(filename):
+ return 'importlib' in filename and '_bootstrap' in filename
+
+
+def _is_filename_to_skip(filename, skip_file_prefixes):
+ return any(filename.startswith(prefix) for prefix in skip_file_prefixes)
+
+
+def _is_internal_frame(frame):
+ """Signal whether the frame is an internal CPython implementation detail."""
+ return _is_internal_filename(frame.f_code.co_filename)
+
+
+def _next_external_frame(frame, skip_file_prefixes):
+ """Find the next frame that doesn't involve Python or user internals."""
+ frame = frame.f_back
+ while frame is not None and (
+ _is_internal_filename(filename := frame.f_code.co_filename) or
+ _is_filename_to_skip(filename, skip_file_prefixes)):
+ frame = frame.f_back
+ return frame
+
+
+# Code typically replaced by _warnings
+def warn(message, category=None, stacklevel=1, source=None,
+ *, skip_file_prefixes=()):
+ """Issue a warning, or maybe ignore it or raise an exception."""
+ # Check if message is already a Warning object
+ if isinstance(message, Warning):
+ category = message.__class__
+ # Check category argument
+ if category is None:
+ category = UserWarning
+ if not (isinstance(category, type) and issubclass(category, Warning)):
+ raise TypeError("category must be a Warning subclass, "
+ "not '{:s}'".format(type(category).__name__))
+ if not isinstance(skip_file_prefixes, tuple):
+ # The C version demands a tuple for implementation performance.
+ raise TypeError('skip_file_prefixes must be a tuple of strs.')
+ if skip_file_prefixes:
+ stacklevel = max(2, stacklevel)
+ # Get context information
+ try:
+ if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)):
+ # If frame is too small to care or if the warning originated in
+ # internal code, then do not try to hide any frames.
+ frame = sys._getframe(stacklevel)
+ else:
+ frame = sys._getframe(1)
+ # Look for one frame less since the above line starts us off.
+ for x in range(stacklevel-1):
+ frame = _next_external_frame(frame, skip_file_prefixes)
+ if frame is None:
+ raise ValueError
+ except ValueError:
+ globals = sys.__dict__
+ filename = "<sys>"
+ lineno = 0
+ else:
+ globals = frame.f_globals
+ filename = frame.f_code.co_filename
+ lineno = frame.f_lineno
+ if '__name__' in globals:
+ module = globals['__name__']
+ else:
+ module = "<string>"
+ registry = globals.setdefault("__warningregistry__", {})
+ _wm.warn_explicit(
+ message,
+ category,
+ filename,
+ lineno,
+ module,
+ registry,
+ globals,
+ source=source,
+ )
+
+
+def warn_explicit(message, category, filename, lineno,
+ module=None, registry=None, module_globals=None,
+ source=None):
+ lineno = int(lineno)
+ if module is None:
+ module = filename or "<unknown>"
+ if module[-3:].lower() == ".py":
+ module = module[:-3] # XXX What about leading pathname?
+ if isinstance(message, Warning):
+ text = str(message)
+ category = message.__class__
+ else:
+ text = message
+ message = category(message)
+ key = (text, category, lineno)
+ with _wm._lock:
+ if registry is None:
+ registry = {}
+ if registry.get('version', 0) != _wm._filters_version:
+ registry.clear()
+ registry['version'] = _wm._filters_version
+ # Quick test for common case
+ if registry.get(key):
+ return
+ # Search the filters
+ for item in _wm._get_filters():
+ action, msg, cat, mod, ln = item
+ if ((msg is None or msg.match(text)) and
+ issubclass(category, cat) and
+ (mod is None or mod.match(module)) and
+ (ln == 0 or lineno == ln)):
+ break
+ else:
+ action = _wm.defaultaction
+ # Early exit actions
+ if action == "ignore":
+ return
+
+ if action == "error":
+ raise message
+ # Other actions
+ if action == "once":
+ registry[key] = 1
+ oncekey = (text, category)
+ if _wm.onceregistry.get(oncekey):
+ return
+ _wm.onceregistry[oncekey] = 1
+ elif action in {"always", "all"}:
+ pass
+ elif action == "module":
+ registry[key] = 1
+ altkey = (text, category, 0)
+ if registry.get(altkey):
+ return
+ registry[altkey] = 1
+ elif action == "default":
+ registry[key] = 1
+ else:
+ # Unrecognized actions are errors
+ raise RuntimeError(
+ "Unrecognized action (%r) in warnings.filters:\n %s" %
+ (action, item))
+
+ # Prime the linecache for formatting, in case the
+ # "file" is actually in a zipfile or something.
+ import linecache
+ linecache.getlines(filename, module_globals)
+
+ # Print message and context
+ msg = _wm.WarningMessage(message, category, filename, lineno, source=source)
+ _wm._showwarnmsg(msg)
+
+
+class WarningMessage(object):
+
+ _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
+ "line", "source")
+
+ def __init__(self, message, category, filename, lineno, file=None,
+ line=None, source=None):
+ self.message = message
+ self.category = category
+ self.filename = filename
+ self.lineno = lineno
+ self.file = file
+ self.line = line
+ self.source = source
+ self._category_name = category.__name__ if category else None
+
+ def __str__(self):
+ return ("{message : %r, category : %r, filename : %r, lineno : %s, "
+ "line : %r}" % (self.message, self._category_name,
+ self.filename, self.lineno, self.line))
+
+
+class catch_warnings(object):
+
+ """A context manager that copies and restores the warnings filter upon
+ exiting the context.
+
+ The 'record' argument specifies whether warnings should be captured by a
+ custom implementation of warnings.showwarning() and be appended to a list
+ returned by the context manager. Otherwise None is returned by the context
+ manager. The objects appended to the list are arguments whose attributes
+ mirror the arguments to showwarning().
+
+ The 'module' argument is to specify an alternative module to the module
+ named 'warnings' and imported under that name. This argument is only useful
+ when testing the warnings module itself.
+
+ If the 'action' argument is not None, the remaining arguments are passed
+ to warnings.simplefilter() as if it were called immediately on entering the
+ context.
+ """
+
+ def __init__(self, *, record=False, module=None,
+ action=None, category=Warning, lineno=0, append=False):
+ """Specify whether to record warnings and if an alternative module
+ should be used other than sys.modules['warnings'].
+
+ """
+ self._record = record
+ self._module = sys.modules['warnings'] if module is None else module
+ self._entered = False
+ if action is None:
+ self._filter = None
+ else:
+ self._filter = (action, category, lineno, append)
+
+ def __repr__(self):
+ args = []
+ if self._record:
+ args.append("record=True")
+ if self._module is not sys.modules['warnings']:
+ args.append("module=%r" % self._module)
+ name = type(self).__name__
+ return "%s(%s)" % (name, ", ".join(args))
+
+ def __enter__(self):
+ if self._entered:
+ raise RuntimeError("Cannot enter %r twice" % self)
+ self._entered = True
+ with _wm._lock:
+ if _use_context:
+ self._saved_context, context = self._module._new_context()
+ else:
+ context = None
+ self._filters = self._module.filters
+ self._module.filters = self._filters[:]
+ self._showwarning = self._module.showwarning
+ self._showwarnmsg_impl = self._module._showwarnmsg_impl
+ self._module._filters_mutated_lock_held()
+ if self._record:
+ if _use_context:
+ context.log = log = []
+ else:
+ log = []
+ self._module._showwarnmsg_impl = log.append
+ # Reset showwarning() to the default implementation to make sure
+ # that _showwarnmsg() calls _showwarnmsg_impl()
+ self._module.showwarning = self._module._showwarning_orig
+ else:
+ log = None
+ if self._filter is not None:
+ self._module.simplefilter(*self._filter)
+ return log
+
+ def __exit__(self, *exc_info):
+ if not self._entered:
+ raise RuntimeError("Cannot exit %r without entering first" % self)
+ with _wm._lock:
+ if _use_context:
+ self._module._warnings_context.set(self._saved_context)
+ else:
+ self._module.filters = self._filters
+ self._module.showwarning = self._showwarning
+ self._module._showwarnmsg_impl = self._showwarnmsg_impl
+ self._module._filters_mutated_lock_held()
+
+
+class deprecated:
+ """Indicate that a class, function or overload is deprecated.
+
+ When this decorator is applied to an object, the type checker
+ will generate a diagnostic on usage of the deprecated object.
+
+ Usage:
+
+ @deprecated("Use B instead")
+ class A:
+ pass
+
+ @deprecated("Use g instead")
+ def f():
+ pass
+
+ @overload
+ @deprecated("int support is deprecated")
+ def g(x: int) -> int: ...
+ @overload
+ def g(x: str) -> int: ...
+
+ The warning specified by *category* will be emitted at runtime
+ on use of deprecated objects. For functions, that happens on calls;
+ for classes, on instantiation and on creation of subclasses.
+ If the *category* is ``None``, no warning is emitted at runtime.
+ The *stacklevel* determines where the
+ warning is emitted. If it is ``1`` (the default), the warning
+ is emitted at the direct caller of the deprecated object; if it
+ is higher, it is emitted further up the stack.
+ Static type checker behavior is not affected by the *category*
+ and *stacklevel* arguments.
+
+ The deprecation message passed to the decorator is saved in the
+ ``__deprecated__`` attribute on the decorated object.
+ If applied to an overload, the decorator
+ must be after the ``@overload`` decorator for the attribute to
+ exist on the overload as returned by ``get_overloads()``.
+
+ See PEP 702 for details.
+
+ """
+ def __init__(
+ self,
+ message: str,
+ /,
+ *,
+ category: type[Warning] | None = DeprecationWarning,
+ stacklevel: int = 1,
+ ) -> None:
+ if not isinstance(message, str):
+ raise TypeError(
+ f"Expected an object of type str for 'message', not {type(message).__name__!r}"
+ )
+ self.message = message
+ self.category = category
+ self.stacklevel = stacklevel
+
+ def __call__(self, arg, /):
+ # Make sure the inner functions created below don't
+ # retain a reference to self.
+ msg = self.message
+ category = self.category
+ stacklevel = self.stacklevel
+ if category is None:
+ arg.__deprecated__ = msg
+ return arg
+ elif isinstance(arg, type):
+ import functools
+ from types import MethodType
+
+ original_new = arg.__new__
+
+ @functools.wraps(original_new)
+ def __new__(cls, /, *args, **kwargs):
+ if cls is arg:
+ _wm.warn(msg, category=category, stacklevel=stacklevel + 1)
+ if original_new is not object.__new__:
+ return original_new(cls, *args, **kwargs)
+ # Mirrors a similar check in object.__new__.
+ elif cls.__init__ is object.__init__ and (args or kwargs):
+ raise TypeError(f"{cls.__name__}() takes no arguments")
+ else:
+ return original_new(cls)
+
+ arg.__new__ = staticmethod(__new__)
+
+ original_init_subclass = arg.__init_subclass__
+ # We need slightly different behavior if __init_subclass__
+ # is a bound method (likely if it was implemented in Python)
+ if isinstance(original_init_subclass, MethodType):
+ original_init_subclass = original_init_subclass.__func__
+
+ @functools.wraps(original_init_subclass)
+ def __init_subclass__(*args, **kwargs):
+ _wm.warn(msg, category=category, stacklevel=stacklevel + 1)
+ return original_init_subclass(*args, **kwargs)
+
+ arg.__init_subclass__ = classmethod(__init_subclass__)
+ # Or otherwise, which likely means it's a builtin such as
+ # object's implementation of __init_subclass__.
+ else:
+ @functools.wraps(original_init_subclass)
+ def __init_subclass__(*args, **kwargs):
+ _wm.warn(msg, category=category, stacklevel=stacklevel + 1)
+ return original_init_subclass(*args, **kwargs)
+
+ arg.__init_subclass__ = __init_subclass__
+
+ arg.__deprecated__ = __new__.__deprecated__ = msg
+ __init_subclass__.__deprecated__ = msg
+ return arg
+ elif callable(arg):
+ import functools
+ import inspect
+
+ @functools.wraps(arg)
+ def wrapper(*args, **kwargs):
+ _wm.warn(msg, category=category, stacklevel=stacklevel + 1)
+ return arg(*args, **kwargs)
+
+ if inspect.iscoroutinefunction(arg):
+ wrapper = inspect.markcoroutinefunction(wrapper)
+
+ arg.__deprecated__ = wrapper.__deprecated__ = msg
+ return wrapper
+ else:
+ raise TypeError(
+ "@deprecated decorator with non-None category must be applied to "
+ f"a class or callable, not {arg!r}"
+ )
+
+
+_DEPRECATED_MSG = "{name!r} is deprecated and slated for removal in Python {remove}"
+
+
+def _deprecated(name, message=_DEPRECATED_MSG, *, remove, _version=sys.version_info):
+ """Warn that *name* is deprecated or should be removed.
+
+ RuntimeError is raised if *remove* specifies a major/minor tuple older than
+ the current Python version or the same version but past the alpha.
+
+ The *message* argument is formatted with *name* and *remove* as a Python
+ version tuple (e.g. (3, 11)).
+
+ """
+ remove_formatted = f"{remove[0]}.{remove[1]}"
+ if (_version[:2] > remove) or (_version[:2] == remove and _version[3] != "alpha"):
+ msg = f"{name!r} was slated for removal after Python {remove_formatted} alpha"
+ raise RuntimeError(msg)
+ else:
+ msg = message.format(name=name, remove=remove_formatted)
+ _wm.warn(msg, DeprecationWarning, stacklevel=3)
+
+
+# Private utility function called by _PyErr_WarnUnawaitedCoroutine
+def _warn_unawaited_coroutine(coro):
+ msg_lines = [
+ f"coroutine '{coro.__qualname__}' was never awaited\n"
+ ]
+ if coro.cr_origin is not None:
+ import linecache, traceback
+ def extract():
+ for filename, lineno, funcname in reversed(coro.cr_origin):
+ line = linecache.getline(filename, lineno)
+ yield (filename, lineno, funcname, line)
+ msg_lines.append("Coroutine created at (most recent call last)\n")
+ msg_lines += traceback.format_list(list(extract()))
+ msg = "".join(msg_lines).rstrip("\n")
+ # Passing source= here means that if the user happens to have tracemalloc
+ # enabled and tracking where the coroutine was created, the warning will
+ # contain that traceback. This does mean that if they have *both*
+ # coroutine origin tracking *and* tracemalloc enabled, they'll get two
+ # partially-redundant tracebacks. If we wanted to be clever we could
+ # probably detect this case and avoid it, but for now we don't bother.
+ _wm.warn(
+ msg, category=RuntimeWarning, stacklevel=2, source=coro
+ )
+
+
+def _setup_defaults():
+ # Several warning categories are ignored by default in regular builds
+ if hasattr(sys, 'gettotalrefcount'):
+ return
+ _wm.filterwarnings("default", category=DeprecationWarning, module="__main__", append=1)
+ _wm.simplefilter("ignore", category=DeprecationWarning, append=1)
+ _wm.simplefilter("ignore", category=PendingDeprecationWarning, append=1)
+ _wm.simplefilter("ignore", category=ImportWarning, append=1)
+ _wm.simplefilter("ignore", category=ResourceWarning, append=1)
raise ValueError("Provide token or tokens returned by ignore_deprecations_from")
new_filters = []
+ old_filters = warnings._get_filters()
endswith = tuple(rf"(?#support{id(token)})" for token in tokens)
- for action, message, category, module, lineno in warnings.filters:
+ for action, message, category, module, lineno in old_filters:
if action == "ignore" and category is DeprecationWarning:
if isinstance(message, re.Pattern):
msg = message.pattern
if msg.endswith(endswith):
continue
new_filters.append((action, message, category, module, lineno))
- if warnings.filters != new_filters:
- warnings.filters[:] = new_filters
+ if old_filters != new_filters:
+ old_filters[:] = new_filters
warnings._filters_mutated()
registry = frame.f_globals.get('__warningregistry__')
if registry:
registry.clear()
- with warnings.catch_warnings(record=True) as w:
- # Set filter "always" to record all warnings. Because
- # test_warnings swap the module, we need to look up in
- # the sys.modules dictionary.
- sys.modules['warnings'].simplefilter("always")
+ # Because test_warnings swap the module, we need to look up in the
+ # sys.modules dictionary.
+ wmod = sys.modules['warnings']
+ with wmod.catch_warnings(record=True) as w:
+ # Set filter "always" to record all warnings.
+ wmod.simplefilter("always")
yield WarningsRecorder(w)
# Filter the recorded warnings
reraise = list(w)
("filesystem_errors", str, None),
("hash_seed", int, None),
("home", str | None, None),
+ ("thread_inherit_context", int, None),
+ ("context_aware_warnings", int, None),
("import_time", bool, None),
("inspect", bool, None),
("install_signal_handlers", bool, None),
]
if support.Py_DEBUG:
options.append(("run_presite", str | None, None))
- if sysconfig.get_config_var('Py_GIL_DISABLED'):
+ if support.Py_GIL_DISABLED:
options.append(("enable_gil", int, None))
options.append(("tlbc_enabled", int, None))
if support.MS_WINDOWS:
("warn_default_encoding", "warn_default_encoding", False),
("safe_path", "safe_path", False),
("int_max_str_digits", "int_max_str_digits", False),
- # "gil" is tested below
+ # "gil", "thread_inherit_context" and "context_aware_warnings" are tested below
):
with self.subTest(flag=flag, name=name, negate=negate):
value = config_get(name)
config_get('use_hash_seed') == 0
or config_get('hash_seed') != 0)
- if sysconfig.get_config_var('Py_GIL_DISABLED'):
+ if support.Py_GIL_DISABLED:
value = config_get('enable_gil')
expected = (value if value != -1 else None)
self.assertEqual(sys.flags.gil, expected)
+ expected_inherit_context = 1 if support.Py_GIL_DISABLED else 0
+ self.assertEqual(sys.flags.thread_inherit_context, expected_inherit_context)
+
+ expected_safe_warnings = 1 if support.Py_GIL_DISABLED else 0
+ self.assertEqual(sys.flags.context_aware_warnings, expected_safe_warnings)
+
def test_config_get_non_existent(self):
# Test PyConfig_Get() on non-existent option name
config_get = _testcapi.config_get
+import sys
import collections.abc
import concurrent.futures
import contextvars
tp.shutdown()
self.assertEqual(results, list(range(10)))
+ @isolated_context
+ @threading_helper.requires_working_threading()
+ def test_context_thread_inherit(self):
+ import threading
+
+ cvar = contextvars.ContextVar('cvar')
+
+ def run_context_none():
+ if sys.flags.thread_inherit_context:
+ expected = 1
+ else:
+ expected = None
+ self.assertEqual(cvar.get(None), expected)
+
+ # By default, context is inherited based on the
+ # sys.flags.thread_inherit_context option.
+ cvar.set(1)
+ thread = threading.Thread(target=run_context_none)
+ thread.start()
+ thread.join()
+
+ # Passing 'None' explicitly should have same behaviour as not
+ # passing parameter.
+ thread = threading.Thread(target=run_context_none, context=None)
+ thread.start()
+ thread.join()
+
+ # An explicit Context value can also be passed
+ custom_ctx = contextvars.Context()
+ custom_var = None
+
+ def setup_context():
+ nonlocal custom_var
+ custom_var = contextvars.ContextVar('custom')
+ custom_var.set(2)
+
+ custom_ctx.run(setup_context)
+
+ def run_custom():
+ self.assertEqual(custom_var.get(), 2)
+
+ thread = threading.Thread(target=run_custom, context=custom_ctx)
+ thread.start()
+ thread.join()
+
+ # You can also pass a new Context() object to start with an empty context
+ def run_empty():
+ with self.assertRaises(LookupError):
+ cvar.get()
+
+ thread = threading.Thread(target=run_empty, context=contextvars.Context())
+ thread.start()
+ thread.join()
+
def test_token_contextmanager_with_default(self):
ctx = contextvars.Context()
c = contextvars.ContextVar('c', default=42)
import random
import inspect
import threading
+import contextvars
if sys.platform == 'darwin':
self.finish1 = threading.Event()
self.finish2 = threading.Event()
- th1 = threading.Thread(target=thfunc1, args=(self,))
- th2 = threading.Thread(target=thfunc2, args=(self,))
+ # This test wants to start threads with an empty context, no matter
+ # the setting of sys.flags.thread_inherit_context. We pass the
+ # 'context' argument explicitly with an empty context instance.
+ th1 = threading.Thread(target=thfunc1, args=(self,),
+ context=contextvars.Context())
+ th2 = threading.Thread(target=thfunc2, args=(self,),
+ context=contextvars.Context())
th1.start()
th2.start()
INIT_LOOPS = 4
MAX_HASH_SEED = 4294967295
-ABI_THREAD = 't' if sysconfig.get_config_var('Py_GIL_DISABLED') else ''
+ABI_THREAD = 't' if support.Py_GIL_DISABLED else ''
# PLATSTDLIB_LANDMARK copied from Modules/getpath.py
if os.name == 'nt':
PLATSTDLIB_LANDMARK = f'{sys.platlibdir}'
PLATSTDLIB_LANDMARK = (f'{sys.platlibdir}/python{VERSION_MAJOR}.'
f'{VERSION_MINOR}{ABI_THREAD}/lib-dynload')
+DEFAULT_THREAD_INHERIT_CONTEXT = 1 if support.Py_GIL_DISABLED else 0
+DEFAULT_CONTEXT_AWARE_WARNINGS = 1 if support.Py_GIL_DISABLED else 0
# If we are running from a build dir, but the stdlib has been installed,
# some tests need to expect different results.
'tracemalloc': 0,
'perf_profiling': 0,
'import_time': False,
+ 'thread_inherit_context': DEFAULT_THREAD_INHERIT_CONTEXT,
+ 'context_aware_warnings': DEFAULT_CONTEXT_AWARE_WARNINGS,
'code_debug_ranges': True,
'show_ref_count': False,
'dump_refs': False,
import time
import unittest
import _testinternalcapi
+import warnings
from test.support import threading_helper
do_race(something_recursive, set_recursion_limit)
+@threading_helper.requires_working_threading()
+class TestWarningsRaces(TestBase):
+ def setUp(self):
+ self.saved_filters = warnings.filters[:]
+ warnings.resetwarnings()
+ # Add multiple filters to the list to increase odds of race.
+ for lineno in range(20):
+ warnings.filterwarnings('ignore', message='not matched', category=Warning, lineno=lineno)
+ # Override showwarning() so that we don't actually show warnings.
+ def showwarning(*args):
+ pass
+ warnings.showwarning = showwarning
+
+ def tearDown(self):
+ warnings.filters[:] = self.saved_filters
+ warnings.showwarning = warnings._showwarning_orig
+
+ def test_racing_warnings_filter(self):
+ # Modifying the warnings.filters list while another thread is using
+ # warn() should not crash or race.
+ def modify_filters():
+ time.sleep(0)
+ warnings.filters[:] = [('ignore', None, UserWarning, None, 0)]
+ time.sleep(0)
+ warnings.filters[:] = self.saved_filters
+
+ def emit_warning():
+ warnings.warn('dummy message', category=UserWarning)
+
+ do_race(modify_filters, emit_warning)
+
+
if __name__ == "__main__":
unittest.main()
class TestSupport(unittest.TestCase):
@classmethod
def setUpClass(cls):
- orig_filter_len = len(warnings.filters)
+ orig_filter_len = len(warnings._get_filters())
cls._warnings_helper_token = support.ignore_deprecations_from(
"test.support.warnings_helper", like=".*used in test_support.*"
)
cls._test_support_token = support.ignore_deprecations_from(
__name__, like=".*You should NOT be seeing this.*"
)
- assert len(warnings.filters) == orig_filter_len + 2
+ assert len(warnings._get_filters()) == orig_filter_len + 2
@classmethod
def tearDownClass(cls):
- orig_filter_len = len(warnings.filters)
+ orig_filter_len = len(warnings._get_filters())
support.clear_ignored_deprecations(
cls._warnings_helper_token,
cls._test_support_token,
)
- assert len(warnings.filters) == orig_filter_len - 2
+ assert len(warnings._get_filters()) == orig_filter_len - 2
def test_ignored_deprecations_are_silent(self):
"""Test support.ignore_deprecations_from() silences warnings"""
# symtable entry
# XXX
# sys.flags
- # FIXME: The +1 will not be necessary once gh-122575 is fixed
- check(sys.flags, vsize('') + self.P + self.P * (1 + len(sys.flags)))
+ # FIXME: The +3 is for the 'gil', 'thread_inherit_context' and
+ # 'context_aware_warnings' flags and will not be necessary once
+ # gh-122575 is fixed
+ check(sys.flags, vsize('') + self.P + self.P * (3 + len(sys.flags)))
def test_asyncgen_hooks(self):
old = sys.get_asyncgen_hooks()
from warnings import deprecated
-py_warnings = import_helper.import_fresh_module('warnings',
- blocked=['_warnings'])
-c_warnings = import_helper.import_fresh_module('warnings',
- fresh=['_warnings'])
+py_warnings = import_helper.import_fresh_module('_py_warnings')
+py_warnings._set_module(py_warnings)
+
+c_warnings = import_helper.import_fresh_module(
+ "warnings", fresh=["_warnings", "_py_warnings"]
+)
+c_warnings._set_module(c_warnings)
@contextmanager
def warnings_state(module):
except NameError:
pass
original_warnings = warning_tests.warnings
- original_filters = module.filters
- try:
+ if module._use_context:
+ saved_context, context = module._new_context()
+ else:
+ original_filters = module.filters
module.filters = original_filters[:]
+ try:
module.simplefilter("once")
warning_tests.warnings = module
yield
finally:
warning_tests.warnings = original_warnings
- module.filters = original_filters
+ if module._use_context:
+ module._set_context(saved_context)
+ else:
+ module.filters = original_filters
class TestWarning(Warning):
"""Testing the filtering functionality."""
def test_error(self):
- with original_warnings.catch_warnings(module=self.module) as w:
+ with self.module.catch_warnings() as w:
self.module.resetwarnings()
self.module.filterwarnings("error", category=UserWarning)
self.assertRaises(UserWarning, self.module.warn,
"FilterTests.test_error")
def test_error_after_default(self):
- with original_warnings.catch_warnings(module=self.module) as w:
+ with self.module.catch_warnings() as w:
self.module.resetwarnings()
message = "FilterTests.test_ignore_after_default"
def f():
self.assertRaises(UserWarning, f)
def test_ignore(self):
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings("ignore", category=UserWarning)
self.module.warn("FilterTests.test_ignore", UserWarning)
self.assertEqual(list(__warningregistry__), ['version'])
def test_ignore_after_default(self):
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
message = "FilterTests.test_ignore_after_default"
def f():
def test_always_and_all(self):
for mode in {"always", "all"}:
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings(mode, category=UserWarning)
message = "FilterTests.test_always_and_all"
def test_always_and_all_after_default(self):
for mode in {"always", "all"}:
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
message = "FilterTests.test_always_and_all_after_ignore"
def f():
self.assertEqual(w[-1].message.args[0], message)
def test_default(self):
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings("default", category=UserWarning)
message = UserWarning("FilterTests.test_default")
raise ValueError("loop variant unhandled")
def test_module(self):
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings("module", category=UserWarning)
message = UserWarning("FilterTests.test_module")
self.assertEqual(len(w), 0)
def test_once(self):
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings("once", category=UserWarning)
message = UserWarning("FilterTests.test_once")
self.assertEqual(len(w), 0)
def test_module_globals(self):
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.simplefilter("always", UserWarning)
# bpo-33509: module_globals=None must not crash
self.assertEqual(len(w), 2)
def test_inheritance(self):
- with original_warnings.catch_warnings(module=self.module) as w:
+ with self.module.catch_warnings() as w:
self.module.resetwarnings()
self.module.filterwarnings("error", category=Warning)
self.assertRaises(UserWarning, self.module.warn,
"FilterTests.test_inheritance", UserWarning)
def test_ordering(self):
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings("ignore", category=UserWarning)
self.module.filterwarnings("error", category=UserWarning,
def test_filterwarnings(self):
# Test filterwarnings().
# Implicitly also tests resetwarnings().
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.filterwarnings("error", "", Warning, "", 0)
self.assertRaises(UserWarning, self.module.warn, 'convert to error')
self.assertIs(w[-1].category, UserWarning)
def test_message_matching(self):
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.simplefilter("ignore", UserWarning)
self.module.filterwarnings("error", "match", UserWarning)
self.assertRaises(UserWarning, self.module.warn, "match")
L[:] = []
L = [("default",X(),UserWarning,X(),0) for i in range(2)]
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.filters = L
self.module.warn_explicit(UserWarning("b"), None, "f.py", 42)
self.assertEqual(str(w[-1].message), "b")
def test_filterwarnings_duplicate_filters(self):
- with original_warnings.catch_warnings(module=self.module):
+ with self.module.catch_warnings():
self.module.resetwarnings()
self.module.filterwarnings("error", category=UserWarning)
- self.assertEqual(len(self.module.filters), 1)
+ self.assertEqual(len(self.module._get_filters()), 1)
self.module.filterwarnings("ignore", category=UserWarning)
self.module.filterwarnings("error", category=UserWarning)
self.assertEqual(
- len(self.module.filters), 2,
+ len(self.module._get_filters()), 2,
"filterwarnings inserted duplicate filter"
)
self.assertEqual(
- self.module.filters[0][0], "error",
+ self.module._get_filters()[0][0], "error",
"filterwarnings did not promote filter to "
"the beginning of list"
)
def test_simplefilter_duplicate_filters(self):
- with original_warnings.catch_warnings(module=self.module):
+ with self.module.catch_warnings():
self.module.resetwarnings()
self.module.simplefilter("error", category=UserWarning)
- self.assertEqual(len(self.module.filters), 1)
+ self.assertEqual(len(self.module._get_filters()), 1)
self.module.simplefilter("ignore", category=UserWarning)
self.module.simplefilter("error", category=UserWarning)
self.assertEqual(
- len(self.module.filters), 2,
+ len(self.module._get_filters()), 2,
"simplefilter inserted duplicate filter"
)
self.assertEqual(
- self.module.filters[0][0], "error",
+ self.module._get_filters()[0][0], "error",
"simplefilter did not promote filter to the beginning of list"
)
def test_append_duplicate(self):
- with original_warnings.catch_warnings(module=self.module,
- record=True) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.simplefilter("ignore")
self.module.simplefilter("error", append=True)
self.module.simplefilter("ignore", append=True)
self.module.warn("test_append_duplicate", category=UserWarning)
- self.assertEqual(len(self.module.filters), 2,
+ self.assertEqual(len(self.module._get_filters()), 2,
"simplefilter inserted duplicate filter"
)
self.assertEqual(len(w), 0,
self.module.simplefilter('ignore', lineno=-1)
def test_catchwarnings_with_simplefilter_ignore(self):
- with original_warnings.catch_warnings(module=self.module):
+ with self.module.catch_warnings(module=self.module):
self.module.resetwarnings()
self.module.simplefilter("error")
- with self.module.catch_warnings(
- module=self.module, action="ignore"
- ):
+ with self.module.catch_warnings(action="ignore"):
self.module.warn("This will be ignored")
def test_catchwarnings_with_simplefilter_error(self):
- with original_warnings.catch_warnings(module=self.module):
+ with self.module.catch_warnings():
self.module.resetwarnings()
with self.module.catch_warnings(
- module=self.module, action="error", category=FutureWarning
+ action="error", category=FutureWarning
):
with support.captured_stderr() as stderr:
error_msg = "Other types of warnings are not errors"
"""Test warnings.warn() and warnings.warn_explicit()."""
def test_message(self):
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.simplefilter("once")
for i in range(4):
text = 'multi %d' %i # Different text on each call.
def test_warn_nonstandard_types(self):
# warn() should handle non-standard types without issue.
for ob in (Warning, None, 42):
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.simplefilter("once")
self.module.warn(ob)
# Don't directly compare objects since
def test_filename(self):
with warnings_state(self.module):
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
warning_tests.inner("spam1")
self.assertEqual(os.path.basename(w[-1].filename),
"stacklevel.py")
# Test stacklevel argument
# make sure all messages are different, so the warning won't be skipped
with warnings_state(self.module):
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
warning_tests.inner("spam3", stacklevel=1)
self.assertEqual(os.path.basename(w[-1].filename),
"stacklevel.py")
# Issue #24305: With stacklevel=2, module-level warnings should work.
import_helper.unload('test.test_warnings.data.import_warning')
with warnings_state(self.module):
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.simplefilter('always')
import test.test_warnings.data.import_warning # noqa: F401
self.assertEqual(len(w), 1)
def test_skip_file_prefixes(self):
with warnings_state(self.module):
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.simplefilter('always')
# Warning never attributed to the data/ package.
# see: gh-126209
with warnings_state(self.module):
skipped = warning_tests.__file__
- with original_warnings.catch_warnings(
- record=True, module=self.module,
- ) as w:
+ with self.module.catch_warnings(record=True) as w:
warning_tests.outer("msg", skip_file_prefixes=(skipped,))
self.assertEqual(len(w), 1)
codeobj = compile(("import warnings\n"
"warnings.warn('hello', UserWarning)"),
filename, "exec")
- with original_warnings.catch_warnings(record=True) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.simplefilter("always", category=UserWarning)
exec(codeobj)
self.assertEqual(w[0].filename, filename)
def test_warn_explicit_non_ascii_filename(self):
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings("always", category=UserWarning)
filenames = ["nonascii\xe9\u20ac"]
self.assertIn('category must be a Warning subclass, not ',
str(cm.exception))
- with original_warnings.catch_warnings(module=self.module):
+ with self.module.catch_warnings():
self.module.resetwarnings()
self.module.filterwarnings('default')
with self.assertWarns(MyWarningClass) as cm:
self.assertIsInstance(cm.warning, Warning)
def check_module_globals(self, module_globals):
- with original_warnings.catch_warnings(module=self.module, record=True) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.filterwarnings('default')
self.module.warn_explicit(
'eggs', UserWarning, 'bar', 1,
if self.module is py_warnings:
self.check_module_globals(module_globals)
return
- with original_warnings.catch_warnings(module=self.module, record=True) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.filterwarnings('always')
with self.assertRaisesRegex(errtype, re.escape(errmsg)):
self.module.warn_explicit(
if self.module is py_warnings:
self.check_module_globals(module_globals)
return
- with original_warnings.catch_warnings(module=self.module, record=True) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.filterwarnings('always')
self.module.warn_explicit(
'eggs', UserWarning, 'bar', 1,
def test_improper_input(self):
# Uses the private _setoption() function to test the parsing
# of command-line warning arguments
- with original_warnings.catch_warnings(module=self.module):
+ with self.module.catch_warnings():
self.assertRaises(self.module._OptionError,
self.module._setoption, '1:2:3:4:5:6')
self.assertRaises(self.module._OptionError,
self.assertRaises(UserWarning, self.module.warn, 'convert to error')
def test_import_from_module(self):
- with original_warnings.catch_warnings(module=self.module):
+ with self.module.catch_warnings():
self.module._setoption('ignore::Warning')
with self.assertRaises(self.module._OptionError):
self.module._setoption('ignore::TestWarning')
def test_filter(self):
# Everything should function even if 'filters' is not in warnings.
- with original_warnings.catch_warnings(module=self.module) as w:
+ with self.module.catch_warnings() as w:
self.module.filterwarnings("error", "", Warning, "", 0)
self.assertRaises(UserWarning, self.module.warn,
'convert to error')
try:
original_registry = self.module.onceregistry
__warningregistry__ = {}
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
self.module.filterwarnings("once", category=UserWarning)
self.module.warn_explicit(message, UserWarning, "file", 42)
message = UserWarning("defaultaction test")
original = self.module.defaultaction
try:
- with original_warnings.catch_warnings(record=True,
- module=self.module) as w:
+ with self.module.catch_warnings(record=True) as w:
self.module.resetwarnings()
registry = {}
self.module.warn_explicit(message, UserWarning, "<test>", 42,
def test_showwarning_missing(self):
# Test that showwarning() missing is okay.
+ if self.module._use_context:
+ # If _use_context is true, the warnings module does not
+ # override/restore showwarning()
+ return
text = 'del showwarning test'
- with original_warnings.catch_warnings(module=self.module):
+ with self.module.catch_warnings():
self.module.filterwarnings("always", category=UserWarning)
del self.module.showwarning
with support.captured_output('stderr') as stream:
def test_showwarnmsg_missing(self):
# Test that _showwarnmsg() missing is okay.
text = 'del _showwarnmsg test'
- with original_warnings.catch_warnings(module=self.module):
+ with self.module.catch_warnings():
self.module.filterwarnings("always", category=UserWarning)
show = self.module._showwarnmsg
self.assertIn(text, result)
def test_showwarning_not_callable(self):
- with original_warnings.catch_warnings(module=self.module):
+ with self.module.catch_warnings():
self.module.filterwarnings("always", category=UserWarning)
self.module.showwarning = print
with support.captured_output('stdout'):
def test_show_warning_output(self):
# With showwarning() missing, make sure that output is okay.
text = 'test show_warning'
- with original_warnings.catch_warnings(module=self.module):
+ with self.module.catch_warnings():
self.module.filterwarnings("always", category=UserWarning)
del self.module.showwarning
with support.captured_output('stderr') as stream:
globals_dict = globals()
oldfile = globals_dict['__file__']
try:
- catch = original_warnings.catch_warnings(record=True,
- module=self.module)
+ catch = self.module.catch_warnings(record=True)
with catch as w:
self.module.filterwarnings("always", category=UserWarning)
globals_dict['__file__'] = None
- original_warnings.warn('test', UserWarning)
+ self.module.warn('test', UserWarning)
self.assertTrue(len(w))
finally:
globals_dict['__file__'] = oldfile
wmod = self.module
- with original_warnings.catch_warnings(module=wmod):
+ with wmod.catch_warnings():
wmod.filterwarnings('default', category=UserWarning)
linecache.clearcache()
# warn_explicit() shouldn't raise a SystemError in case
# warnings.onceregistry isn't a dictionary.
wmod = self.module
- with original_warnings.catch_warnings(module=wmod):
+ with wmod.catch_warnings():
wmod.filterwarnings('once')
with support.swap_attr(wmod, 'onceregistry', None):
with self.assertRaises(TypeError):
# warn_explicit() shouldn't cause an assertion failure in case of a
# bad warnings.filters or warnings.defaultaction.
wmod = self.module
- with original_warnings.catch_warnings(module=wmod):
- wmod.filters = [(None, None, Warning, None, 0)]
+ with wmod.catch_warnings():
+ wmod._get_filters()[:] = [(None, None, Warning, None, 0)]
with self.assertRaises(TypeError):
wmod.warn_explicit('foo', Warning, 'bar', 1)
- wmod.filters = []
+ wmod._get_filters()[:] = []
with support.swap_attr(wmod, 'defaultaction', None), \
self.assertRaises(TypeError):
wmod.warn_explicit('foo', Warning, 'bar', 1)
def test_issue31566(self):
# warn() shouldn't cause an assertion failure in case of a bad
# __name__ global.
- with original_warnings.catch_warnings(module=self.module):
+ with self.module.catch_warnings():
self.module.filterwarnings('error', category=UserWarning)
with support.swap_item(globals(), '__name__', b'foo'), \
support.swap_item(globals(), '__file__', None):
"""Test catch_warnings()."""
def test_catch_warnings_restore(self):
+ if self.module._use_context:
+ return # test disabled if using context vars
wmod = self.module
orig_filters = wmod.filters
orig_showwarning = wmod.showwarning
# Ensure both showwarning and filters are restored when recording
- with wmod.catch_warnings(module=wmod, record=True):
+ with wmod.catch_warnings(record=True):
wmod.filters = wmod.showwarning = object()
self.assertIs(wmod.filters, orig_filters)
self.assertIs(wmod.showwarning, orig_showwarning)
# Same test, but with recording disabled
- with wmod.catch_warnings(module=wmod, record=False):
+ with wmod.catch_warnings(record=False):
wmod.filters = wmod.showwarning = object()
self.assertIs(wmod.filters, orig_filters)
self.assertIs(wmod.showwarning, orig_showwarning)
def test_catch_warnings_recording(self):
wmod = self.module
# Ensure warnings are recorded when requested
- with wmod.catch_warnings(module=wmod, record=True) as w:
+ with wmod.catch_warnings(record=True) as w:
self.assertEqual(w, [])
self.assertIs(type(w), list)
wmod.simplefilter("always")
self.assertEqual(w, [])
# Ensure warnings are not recorded when not requested
orig_showwarning = wmod.showwarning
- with wmod.catch_warnings(module=wmod, record=False) as w:
+ with wmod.catch_warnings(record=False) as w:
self.assertIsNone(w)
self.assertIs(wmod.showwarning, orig_showwarning)
def test_catch_warnings_reentry_guard(self):
wmod = self.module
# Ensure catch_warnings is protected against incorrect usage
- x = wmod.catch_warnings(module=wmod, record=True)
+ x = wmod.catch_warnings(record=True)
self.assertRaises(RuntimeError, x.__exit__)
with x:
self.assertRaises(RuntimeError, x.__enter__)
# Same test, but with recording disabled
- x = wmod.catch_warnings(module=wmod, record=False)
+ x = wmod.catch_warnings(record=False)
self.assertRaises(RuntimeError, x.__exit__)
with x:
self.assertRaises(RuntimeError, x.__enter__)
def test_catch_warnings_defaults(self):
wmod = self.module
- orig_filters = wmod.filters
+ orig_filters = wmod._get_filters()
orig_showwarning = wmod.showwarning
# Ensure default behaviour is not to record warnings
- with wmod.catch_warnings(module=wmod) as w:
+ with wmod.catch_warnings() as w:
self.assertIsNone(w)
self.assertIs(wmod.showwarning, orig_showwarning)
- self.assertIsNot(wmod.filters, orig_filters)
- self.assertIs(wmod.filters, orig_filters)
+ self.assertIsNot(wmod._get_filters(), orig_filters)
+ self.assertIs(wmod._get_filters(), orig_filters)
if wmod is sys.modules['warnings']:
# Ensure the default module is this one
with wmod.catch_warnings() as w:
self.assertIsNone(w)
self.assertIs(wmod.showwarning, orig_showwarning)
- self.assertIsNot(wmod.filters, orig_filters)
- self.assertIs(wmod.filters, orig_filters)
+ self.assertIsNot(wmod._get_filters(), orig_filters)
+ self.assertIs(wmod._get_filters(), orig_filters)
def test_record_override_showwarning_before(self):
# Issue #28835: If warnings.showwarning() was overridden, make sure
# that catch_warnings(record=True) overrides it again.
+ if self.module._use_context:
+ # If _use_context is true, the warnings module does not restore
+ # showwarning()
+ return
text = "This is a warning"
wmod = self.module
my_log = []
# Override warnings.showwarning() before calling catch_warnings()
with support.swap_attr(wmod, 'showwarning', my_logger):
- with wmod.catch_warnings(module=wmod, record=True) as log:
+ with wmod.catch_warnings(record=True) as log:
self.assertIsNot(wmod.showwarning, my_logger)
wmod.simplefilter("always")
def test_record_override_showwarning_inside(self):
# Issue #28835: It is possible to override warnings.showwarning()
# in the catch_warnings(record=True) context manager.
+ if self.module._use_context:
+ # If _use_context is true, the warnings module does not restore
+ # showwarning()
+ return
text = "This is a warning"
wmod = self.module
my_log = []
nonlocal my_log
my_log.append(message)
- with wmod.catch_warnings(module=wmod, record=True) as log:
+ with wmod.catch_warnings(record=True) as log:
wmod.simplefilter("always")
wmod.showwarning = my_logger
wmod.warn(text)
code = "import sys; sys.modules.pop('warnings', None); sys.modules['_warnings'] = None; "
else:
code = ""
- code += "import warnings; [print(f) for f in warnings.filters]"
+ code += "import warnings; [print(f) for f in warnings._get_filters()]"
rc, stdout, stderr = assert_python_ok("-c", code, __isolated=True)
stdout_lines = [line.strip() for line in stdout.splitlines()]
self.assertTrue(err.startswith(expected), ascii(err))
+class AsyncTests(BaseTest):
+ """Verifies that the catch_warnings() context manager behaves
+ as expected when used inside async co-routines. This requires
+ that the context_aware_warnings flag is enabled, so that
+ the context manager uses a context variable.
+ """
+
+ def setUp(self):
+ super().setUp()
+ self.module.resetwarnings()
+
+ @unittest.skipIf(not sys.flags.context_aware_warnings,
+ "requires context aware warnings")
+ def test_async_context(self):
+ import asyncio
+
+ # Events to force the execution interleaving we want.
+ step_a1 = asyncio.Event()
+ step_a2 = asyncio.Event()
+ step_b1 = asyncio.Event()
+ step_b2 = asyncio.Event()
+
+ async def run_a():
+ with self.module.catch_warnings(record=True) as w:
+ await step_a1.wait()
+ # The warning emitted here should be caught be the enclosing
+ # context manager.
+ self.module.warn('run_a warning', UserWarning)
+ step_b1.set()
+ await step_a2.wait()
+ self.assertEqual(len(w), 1)
+ self.assertEqual(w[0].message.args[0], 'run_a warning')
+ step_b2.set()
+
+ async def run_b():
+ with self.module.catch_warnings(record=True) as w:
+ step_a1.set()
+ await step_b1.wait()
+ # The warning emitted here should be caught be the enclosing
+ # context manager.
+ self.module.warn('run_b warning', UserWarning)
+ step_a2.set()
+ await step_b2.wait()
+ self.assertEqual(len(w), 1)
+ self.assertEqual(w[0].message.args[0], 'run_b warning')
+
+ async def run_tasks():
+ await asyncio.gather(run_a(), run_b())
+
+ asyncio.run(run_tasks())
+
+ @unittest.skipIf(not sys.flags.context_aware_warnings,
+ "requires context aware warnings")
+ def test_async_task_inherit(self):
+ """Check that a new asyncio task inherits warnings context from the
+ coroutine that spawns it.
+ """
+ import asyncio
+
+ step1 = asyncio.Event()
+ step2 = asyncio.Event()
+
+ async def run_child1():
+ await step1.wait()
+ # This should be recorded by the run_parent() catch_warnings
+ # context.
+ self.module.warn('child warning', UserWarning)
+ step2.set()
+
+ async def run_child2():
+ # This establishes a new catch_warnings() context. The
+ # run_child1() task should still be using the context from
+ # run_parent() if context-aware warnings are enabled.
+ with self.module.catch_warnings(record=True) as w:
+ step1.set()
+ await step2.wait()
+
+ async def run_parent():
+ with self.module.catch_warnings(record=True) as w:
+ await asyncio.gather(run_child1(), run_child2())
+ self.assertEqual(len(w), 1)
+ self.assertEqual(w[0].message.args[0], 'child warning')
+
+ asyncio.run(run_parent())
+
+
+class CAsyncTests(AsyncTests, unittest.TestCase):
+ module = c_warnings
+
+
+class PyAsyncTests(AsyncTests, unittest.TestCase):
+ module = py_warnings
+
+
+class ThreadTests(BaseTest):
+ """Verifies that the catch_warnings() context manager behaves as
+ expected when used within threads. This requires that both the
+ context_aware_warnings flag and thread_inherit_context flags are enabled.
+ """
+
+ ENABLE_THREAD_TESTS = (sys.flags.context_aware_warnings and
+ sys.flags.thread_inherit_context)
+
+ def setUp(self):
+ super().setUp()
+ self.module.resetwarnings()
+
+ @unittest.skipIf(not ENABLE_THREAD_TESTS,
+ "requires thread-safe warnings flags")
+ def test_threaded_context(self):
+ import threading
+
+ barrier = threading.Barrier(2)
+
+ def run_a():
+ with self.module.catch_warnings(record=True) as w:
+ barrier.wait()
+ # The warning emitted here should be caught be the enclosing
+ # context manager.
+ self.module.warn('run_a warning', UserWarning)
+ barrier.wait()
+ self.assertEqual(len(w), 1)
+ self.assertEqual(w[0].message.args[0], 'run_a warning')
+ # Should be caught be the catch_warnings() context manager of run_threads()
+ self.module.warn('main warning', UserWarning)
+
+ def run_b():
+ with self.module.catch_warnings(record=True) as w:
+ barrier.wait()
+ # The warning emitted here should be caught be the enclosing
+ # context manager.
+ barrier.wait()
+ self.module.warn('run_b warning', UserWarning)
+ self.assertEqual(len(w), 1)
+ self.assertEqual(w[0].message.args[0], 'run_b warning')
+ # Should be caught be the catch_warnings() context manager of run_threads()
+ self.module.warn('main warning', UserWarning)
+
+ def run_threads():
+ threads = [
+ threading.Thread(target=run_a),
+ threading.Thread(target=run_b),
+ ]
+ with self.module.catch_warnings(record=True) as w:
+ for thread in threads:
+ thread.start()
+ for thread in threads:
+ thread.join()
+ self.assertEqual(len(w), 2)
+ self.assertEqual(w[0].message.args[0], 'main warning')
+ self.assertEqual(w[1].message.args[0], 'main warning')
+
+ run_threads()
+
+
+class CThreadTests(ThreadTests, unittest.TestCase):
+ module = c_warnings
+
+
+class PyThreadTests(ThreadTests, unittest.TestCase):
+ module = py_warnings
+
+
class DeprecatedTests(PyPublicAPITests):
def test_dunder_deprecated(self):
@deprecated("A will go away soon")
import os as _os
import sys as _sys
import _thread
+import _contextvars
from time import monotonic as _time
from _weakrefset import WeakSet
_initialized = False
def __init__(self, group=None, target=None, name=None,
- args=(), kwargs=None, *, daemon=None):
+ args=(), kwargs=None, *, daemon=None, context=None):
"""This constructor should always be called with keyword arguments. Arguments are:
*group* should be None; reserved for future extension when a ThreadGroup
*kwargs* is a dictionary of keyword arguments for the target
invocation. Defaults to {}.
+ *context* is the contextvars.Context value to use for the thread.
+ The default value is None, which means to check
+ sys.flags.thread_inherit_context. If that flag is true, use a copy
+ of the context of the caller. If false, use an empty context. To
+ explicitly start with an empty context, pass a new instance of
+ contextvars.Context(). To explicitly start with a copy of the current
+ context, pass the value from contextvars.copy_context().
+
If a subclass overrides the constructor, it must make sure to invoke
the base class constructor (Thread.__init__()) before doing anything
else to the thread.
self._daemonic = daemon
else:
self._daemonic = current_thread().daemon
+ self._context = context
self._ident = None
if _HAVE_THREAD_NATIVE_ID:
self._native_id = None
with _active_limbo_lock:
_limbo[self] = self
+
+ if self._context is None:
+ # No context provided
+ if _sys.flags.thread_inherit_context:
+ # start with a copy of the context of the caller
+ self._context = _contextvars.copy_context()
+ else:
+ # start with an empty context
+ self._context = _contextvars.Context()
+
try:
# Start joinable thread
_start_joinable_thread(self._bootstrap, handle=self._handle,
_sys.setprofile(_profile_hook)
try:
- self.run()
+ self._context.run(self.run)
except:
self._invoke_excepthook(self)
finally:
-"""Python part of the warnings subsystem."""
-
import sys
+__all__ = [
+ "warn",
+ "warn_explicit",
+ "showwarning",
+ "formatwarning",
+ "filterwarnings",
+ "simplefilter",
+ "resetwarnings",
+ "catch_warnings",
+ "deprecated",
+]
+
+from _py_warnings import (
+ WarningMessage,
+ _DEPRECATED_MSG,
+ _OptionError,
+ _add_filter,
+ _deprecated,
+ _filters_mutated,
+ _filters_mutated_lock_held,
+ _filters_version,
+ _formatwarning_orig,
+ _formatwarnmsg,
+ _formatwarnmsg_impl,
+ _get_context,
+ _get_filters,
+ _getaction,
+ _getcategory,
+ _is_filename_to_skip,
+ _is_internal_filename,
+ _is_internal_frame,
+ _lock,
+ _new_context,
+ _next_external_frame,
+ _processoptions,
+ _set_context,
+ _set_module,
+ _setoption,
+ _setup_defaults,
+ _showwarning_orig,
+ _showwarnmsg,
+ _showwarnmsg_impl,
+ _use_context,
+ _warn_unawaited_coroutine,
+ _warnings_context,
+ catch_warnings,
+ defaultaction,
+ deprecated,
+ filters,
+ filterwarnings,
+ formatwarning,
+ onceregistry,
+ resetwarnings,
+ showwarning,
+ simplefilter,
+ warn,
+ warn_explicit,
+)
-__all__ = ["warn", "warn_explicit", "showwarning",
- "formatwarning", "filterwarnings", "simplefilter",
- "resetwarnings", "catch_warnings", "deprecated"]
-
-def showwarning(message, category, filename, lineno, file=None, line=None):
- """Hook to write a warning to a file; replace if you like."""
- msg = WarningMessage(message, category, filename, lineno, file, line)
- _showwarnmsg_impl(msg)
-
-def formatwarning(message, category, filename, lineno, line=None):
- """Function to format a warning the standard way."""
- msg = WarningMessage(message, category, filename, lineno, None, line)
- return _formatwarnmsg_impl(msg)
-
-def _showwarnmsg_impl(msg):
- file = msg.file
- if file is None:
- file = sys.stderr
- if file is None:
- # sys.stderr is None when run with pythonw.exe:
- # warnings get lost
- return
- text = _formatwarnmsg(msg)
- try:
- file.write(text)
- except OSError:
- # the file (probably stderr) is invalid - this warning gets lost.
- pass
-
-def _formatwarnmsg_impl(msg):
- category = msg.category.__name__
- s = f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n"
-
- if msg.line is None:
- try:
- import linecache
- line = linecache.getline(msg.filename, msg.lineno)
- except Exception:
- # When a warning is logged during Python shutdown, linecache
- # and the import machinery don't work anymore
- line = None
- linecache = None
- else:
- line = msg.line
- if line:
- line = line.strip()
- s += " %s\n" % line
-
- if msg.source is not None:
- try:
- import tracemalloc
- # Logging a warning should not raise a new exception:
- # catch Exception, not only ImportError and RecursionError.
- except Exception:
- # don't suggest to enable tracemalloc if it's not available
- suggest_tracemalloc = False
- tb = None
- else:
- try:
- suggest_tracemalloc = not tracemalloc.is_tracing()
- tb = tracemalloc.get_object_traceback(msg.source)
- except Exception:
- # When a warning is logged during Python shutdown, tracemalloc
- # and the import machinery don't work anymore
- suggest_tracemalloc = False
- tb = None
-
- if tb is not None:
- s += 'Object allocated at (most recent call last):\n'
- for frame in tb:
- s += (' File "%s", lineno %s\n'
- % (frame.filename, frame.lineno))
-
- try:
- if linecache is not None:
- line = linecache.getline(frame.filename, frame.lineno)
- else:
- line = None
- except Exception:
- line = None
- if line:
- line = line.strip()
- s += ' %s\n' % line
- elif suggest_tracemalloc:
- s += (f'{category}: Enable tracemalloc to get the object '
- f'allocation traceback\n')
- return s
-
-# Keep a reference to check if the function was replaced
-_showwarning_orig = showwarning
-
-def _showwarnmsg(msg):
- """Hook to write a warning to a file; replace if you like."""
- try:
- sw = showwarning
- except NameError:
- pass
- else:
- if sw is not _showwarning_orig:
- # warnings.showwarning() was replaced
- if not callable(sw):
- raise TypeError("warnings.showwarning() must be set to a "
- "function or method")
-
- sw(msg.message, msg.category, msg.filename, msg.lineno,
- msg.file, msg.line)
- return
- _showwarnmsg_impl(msg)
-
-# Keep a reference to check if the function was replaced
-_formatwarning_orig = formatwarning
-
-def _formatwarnmsg(msg):
- """Function to format a warning the standard way."""
- try:
- fw = formatwarning
- except NameError:
- pass
- else:
- if fw is not _formatwarning_orig:
- # warnings.formatwarning() was replaced
- return fw(msg.message, msg.category,
- msg.filename, msg.lineno, msg.line)
- return _formatwarnmsg_impl(msg)
-
-def filterwarnings(action, message="", category=Warning, module="", lineno=0,
- append=False):
- """Insert an entry into the list of warnings filters (at the front).
-
- 'action' -- one of "error", "ignore", "always", "all", "default", "module",
- or "once"
- 'message' -- a regex that the warning message must match
- 'category' -- a class that the warning must be a subclass of
- 'module' -- a regex that the module name must match
- 'lineno' -- an integer line number, 0 matches all warnings
- 'append' -- if true, append to the list of filters
- """
- if action not in {"error", "ignore", "always", "all", "default", "module", "once"}:
- raise ValueError(f"invalid action: {action!r}")
- if not isinstance(message, str):
- raise TypeError("message must be a string")
- if not isinstance(category, type) or not issubclass(category, Warning):
- raise TypeError("category must be a Warning subclass")
- if not isinstance(module, str):
- raise TypeError("module must be a string")
- if not isinstance(lineno, int):
- raise TypeError("lineno must be an int")
- if lineno < 0:
- raise ValueError("lineno must be an int >= 0")
-
- if message or module:
- import re
-
- if message:
- message = re.compile(message, re.I)
- else:
- message = None
- if module:
- module = re.compile(module)
- else:
- module = None
-
- _add_filter(action, message, category, module, lineno, append=append)
-
-def simplefilter(action, category=Warning, lineno=0, append=False):
- """Insert a simple entry into the list of warnings filters (at the front).
-
- A simple filter matches all modules and messages.
- 'action' -- one of "error", "ignore", "always", "all", "default", "module",
- or "once"
- 'category' -- a class that the warning must be a subclass of
- 'lineno' -- an integer line number, 0 matches all warnings
- 'append' -- if true, append to the list of filters
- """
- if action not in {"error", "ignore", "always", "all", "default", "module", "once"}:
- raise ValueError(f"invalid action: {action!r}")
- if not isinstance(lineno, int):
- raise TypeError("lineno must be an int")
- if lineno < 0:
- raise ValueError("lineno must be an int >= 0")
- _add_filter(action, None, category, None, lineno, append=append)
-
-def _filters_mutated():
- # Even though this function is not part of the public API, it's used by
- # a fair amount of user code.
- with _lock:
- _filters_mutated_lock_held()
-
-def _add_filter(*item, append):
- with _lock:
- if not append:
- # Remove possible duplicate filters, so new one will be placed
- # in correct place. If append=True and duplicate exists, do nothing.
- try:
- filters.remove(item)
- except ValueError:
- pass
- filters.insert(0, item)
- else:
- if item not in filters:
- filters.append(item)
- _filters_mutated_lock_held()
-
-def resetwarnings():
- """Clear the list of warning filters, so that no filters are active."""
- with _lock:
- filters[:] = []
- _filters_mutated_lock_held()
-
-class _OptionError(Exception):
- """Exception used by option processing helpers."""
- pass
-
-# Helper to process -W options passed via sys.warnoptions
-def _processoptions(args):
- for arg in args:
- try:
- _setoption(arg)
- except _OptionError as msg:
- print("Invalid -W option ignored:", msg, file=sys.stderr)
-
-# Helper for _processoptions()
-def _setoption(arg):
- parts = arg.split(':')
- if len(parts) > 5:
- raise _OptionError("too many fields (max 5): %r" % (arg,))
- while len(parts) < 5:
- parts.append('')
- action, message, category, module, lineno = [s.strip()
- for s in parts]
- action = _getaction(action)
- category = _getcategory(category)
- if message or module:
- import re
- if message:
- message = re.escape(message)
- if module:
- module = re.escape(module) + r'\Z'
- if lineno:
- try:
- lineno = int(lineno)
- if lineno < 0:
- raise ValueError
- except (ValueError, OverflowError):
- raise _OptionError("invalid lineno %r" % (lineno,)) from None
- else:
- lineno = 0
- filterwarnings(action, message, category, module, lineno)
-
-# Helper for _setoption()
-def _getaction(action):
- if not action:
- return "default"
- for a in ('default', 'always', 'all', 'ignore', 'module', 'once', 'error'):
- if a.startswith(action):
- return a
- raise _OptionError("invalid action: %r" % (action,))
-
-# Helper for _setoption()
-def _getcategory(category):
- if not category:
- return Warning
- if '.' not in category:
- import builtins as m
- klass = category
- else:
- module, _, klass = category.rpartition('.')
- try:
- m = __import__(module, None, None, [klass])
- except ImportError:
- raise _OptionError("invalid module name: %r" % (module,)) from None
- try:
- cat = getattr(m, klass)
- except AttributeError:
- raise _OptionError("unknown warning category: %r" % (category,)) from None
- if not issubclass(cat, Warning):
- raise _OptionError("invalid warning category: %r" % (category,))
- return cat
-
-
-def _is_internal_filename(filename):
- return 'importlib' in filename and '_bootstrap' in filename
-
-
-def _is_filename_to_skip(filename, skip_file_prefixes):
- return any(filename.startswith(prefix) for prefix in skip_file_prefixes)
-
-
-def _is_internal_frame(frame):
- """Signal whether the frame is an internal CPython implementation detail."""
- return _is_internal_filename(frame.f_code.co_filename)
-
-
-def _next_external_frame(frame, skip_file_prefixes):
- """Find the next frame that doesn't involve Python or user internals."""
- frame = frame.f_back
- while frame is not None and (
- _is_internal_filename(filename := frame.f_code.co_filename) or
- _is_filename_to_skip(filename, skip_file_prefixes)):
- frame = frame.f_back
- return frame
-
-
-# Code typically replaced by _warnings
-def warn(message, category=None, stacklevel=1, source=None,
- *, skip_file_prefixes=()):
- """Issue a warning, or maybe ignore it or raise an exception."""
- # Check if message is already a Warning object
- if isinstance(message, Warning):
- category = message.__class__
- # Check category argument
- if category is None:
- category = UserWarning
- if not (isinstance(category, type) and issubclass(category, Warning)):
- raise TypeError("category must be a Warning subclass, "
- "not '{:s}'".format(type(category).__name__))
- if not isinstance(skip_file_prefixes, tuple):
- # The C version demands a tuple for implementation performance.
- raise TypeError('skip_file_prefixes must be a tuple of strs.')
- if skip_file_prefixes:
- stacklevel = max(2, stacklevel)
- # Get context information
- try:
- if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)):
- # If frame is too small to care or if the warning originated in
- # internal code, then do not try to hide any frames.
- frame = sys._getframe(stacklevel)
- else:
- frame = sys._getframe(1)
- # Look for one frame less since the above line starts us off.
- for x in range(stacklevel-1):
- frame = _next_external_frame(frame, skip_file_prefixes)
- if frame is None:
- raise ValueError
- except ValueError:
- globals = sys.__dict__
- filename = "<sys>"
- lineno = 0
- else:
- globals = frame.f_globals
- filename = frame.f_code.co_filename
- lineno = frame.f_lineno
- if '__name__' in globals:
- module = globals['__name__']
- else:
- module = "<string>"
- registry = globals.setdefault("__warningregistry__", {})
- warn_explicit(message, category, filename, lineno, module, registry,
- globals, source)
-
-def warn_explicit(message, category, filename, lineno,
- module=None, registry=None, module_globals=None,
- source=None):
- lineno = int(lineno)
- if module is None:
- module = filename or "<unknown>"
- if module[-3:].lower() == ".py":
- module = module[:-3] # XXX What about leading pathname?
- if isinstance(message, Warning):
- text = str(message)
- category = message.__class__
- else:
- text = message
- message = category(message)
- key = (text, category, lineno)
- with _lock:
- if registry is None:
- registry = {}
- if registry.get('version', 0) != _filters_version:
- registry.clear()
- registry['version'] = _filters_version
- # Quick test for common case
- if registry.get(key):
- return
- # Search the filters
- for item in filters:
- action, msg, cat, mod, ln = item
- if ((msg is None or msg.match(text)) and
- issubclass(category, cat) and
- (mod is None or mod.match(module)) and
- (ln == 0 or lineno == ln)):
- break
- else:
- action = defaultaction
- # Early exit actions
- if action == "ignore":
- return
-
- if action == "error":
- raise message
- # Other actions
- if action == "once":
- registry[key] = 1
- oncekey = (text, category)
- if onceregistry.get(oncekey):
- return
- onceregistry[oncekey] = 1
- elif action in {"always", "all"}:
- pass
- elif action == "module":
- registry[key] = 1
- altkey = (text, category, 0)
- if registry.get(altkey):
- return
- registry[altkey] = 1
- elif action == "default":
- registry[key] = 1
- else:
- # Unrecognized actions are errors
- raise RuntimeError(
- "Unrecognized action (%r) in warnings.filters:\n %s" %
- (action, item))
-
- # Prime the linecache for formatting, in case the
- # "file" is actually in a zipfile or something.
- import linecache
- linecache.getlines(filename, module_globals)
-
- # Print message and context
- msg = WarningMessage(message, category, filename, lineno, source=source)
- _showwarnmsg(msg)
-
-
-class WarningMessage(object):
-
- _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
- "line", "source")
-
- def __init__(self, message, category, filename, lineno, file=None,
- line=None, source=None):
- self.message = message
- self.category = category
- self.filename = filename
- self.lineno = lineno
- self.file = file
- self.line = line
- self.source = source
- self._category_name = category.__name__ if category else None
-
- def __str__(self):
- return ("{message : %r, category : %r, filename : %r, lineno : %s, "
- "line : %r}" % (self.message, self._category_name,
- self.filename, self.lineno, self.line))
-
-
-class catch_warnings(object):
-
- """A context manager that copies and restores the warnings filter upon
- exiting the context.
-
- The 'record' argument specifies whether warnings should be captured by a
- custom implementation of warnings.showwarning() and be appended to a list
- returned by the context manager. Otherwise None is returned by the context
- manager. The objects appended to the list are arguments whose attributes
- mirror the arguments to showwarning().
-
- The 'module' argument is to specify an alternative module to the module
- named 'warnings' and imported under that name. This argument is only useful
- when testing the warnings module itself.
-
- If the 'action' argument is not None, the remaining arguments are passed
- to warnings.simplefilter() as if it were called immediately on entering the
- context.
- """
-
- def __init__(self, *, record=False, module=None,
- action=None, category=Warning, lineno=0, append=False):
- """Specify whether to record warnings and if an alternative module
- should be used other than sys.modules['warnings'].
-
- """
- self._record = record
- self._module = sys.modules['warnings'] if module is None else module
- self._entered = False
- if action is None:
- self._filter = None
- else:
- self._filter = (action, category, lineno, append)
-
- def __repr__(self):
- args = []
- if self._record:
- args.append("record=True")
- if self._module is not sys.modules['warnings']:
- args.append("module=%r" % self._module)
- name = type(self).__name__
- return "%s(%s)" % (name, ", ".join(args))
-
- def __enter__(self):
- if self._entered:
- raise RuntimeError("Cannot enter %r twice" % self)
- self._entered = True
- with _lock:
- self._filters = self._module.filters
- self._module.filters = self._filters[:]
- self._module._filters_mutated_lock_held()
- self._showwarning = self._module.showwarning
- self._showwarnmsg_impl = self._module._showwarnmsg_impl
- if self._record:
- log = []
- self._module._showwarnmsg_impl = log.append
- # Reset showwarning() to the default implementation to make sure
- # that _showwarnmsg() calls _showwarnmsg_impl()
- self._module.showwarning = self._module._showwarning_orig
- else:
- log = None
- if self._filter is not None:
- simplefilter(*self._filter)
- return log
-
- def __exit__(self, *exc_info):
- if not self._entered:
- raise RuntimeError("Cannot exit %r without entering first" % self)
- with _lock:
- self._module.filters = self._filters
- self._module._filters_mutated_lock_held()
- self._module.showwarning = self._showwarning
- self._module._showwarnmsg_impl = self._showwarnmsg_impl
-
-
-class deprecated:
- """Indicate that a class, function or overload is deprecated.
-
- When this decorator is applied to an object, the type checker
- will generate a diagnostic on usage of the deprecated object.
-
- Usage:
-
- @deprecated("Use B instead")
- class A:
- pass
-
- @deprecated("Use g instead")
- def f():
- pass
-
- @overload
- @deprecated("int support is deprecated")
- def g(x: int) -> int: ...
- @overload
- def g(x: str) -> int: ...
-
- The warning specified by *category* will be emitted at runtime
- on use of deprecated objects. For functions, that happens on calls;
- for classes, on instantiation and on creation of subclasses.
- If the *category* is ``None``, no warning is emitted at runtime.
- The *stacklevel* determines where the
- warning is emitted. If it is ``1`` (the default), the warning
- is emitted at the direct caller of the deprecated object; if it
- is higher, it is emitted further up the stack.
- Static type checker behavior is not affected by the *category*
- and *stacklevel* arguments.
-
- The deprecation message passed to the decorator is saved in the
- ``__deprecated__`` attribute on the decorated object.
- If applied to an overload, the decorator
- must be after the ``@overload`` decorator for the attribute to
- exist on the overload as returned by ``get_overloads()``.
-
- See PEP 702 for details.
-
- """
- def __init__(
- self,
- message: str,
- /,
- *,
- category: type[Warning] | None = DeprecationWarning,
- stacklevel: int = 1,
- ) -> None:
- if not isinstance(message, str):
- raise TypeError(
- f"Expected an object of type str for 'message', not {type(message).__name__!r}"
- )
- self.message = message
- self.category = category
- self.stacklevel = stacklevel
-
- def __call__(self, arg, /):
- # Make sure the inner functions created below don't
- # retain a reference to self.
- msg = self.message
- category = self.category
- stacklevel = self.stacklevel
- if category is None:
- arg.__deprecated__ = msg
- return arg
- elif isinstance(arg, type):
- import functools
- from types import MethodType
-
- original_new = arg.__new__
-
- @functools.wraps(original_new)
- def __new__(cls, /, *args, **kwargs):
- if cls is arg:
- warn(msg, category=category, stacklevel=stacklevel + 1)
- if original_new is not object.__new__:
- return original_new(cls, *args, **kwargs)
- # Mirrors a similar check in object.__new__.
- elif cls.__init__ is object.__init__ and (args or kwargs):
- raise TypeError(f"{cls.__name__}() takes no arguments")
- else:
- return original_new(cls)
-
- arg.__new__ = staticmethod(__new__)
-
- original_init_subclass = arg.__init_subclass__
- # We need slightly different behavior if __init_subclass__
- # is a bound method (likely if it was implemented in Python)
- if isinstance(original_init_subclass, MethodType):
- original_init_subclass = original_init_subclass.__func__
-
- @functools.wraps(original_init_subclass)
- def __init_subclass__(*args, **kwargs):
- warn(msg, category=category, stacklevel=stacklevel + 1)
- return original_init_subclass(*args, **kwargs)
-
- arg.__init_subclass__ = classmethod(__init_subclass__)
- # Or otherwise, which likely means it's a builtin such as
- # object's implementation of __init_subclass__.
- else:
- @functools.wraps(original_init_subclass)
- def __init_subclass__(*args, **kwargs):
- warn(msg, category=category, stacklevel=stacklevel + 1)
- return original_init_subclass(*args, **kwargs)
-
- arg.__init_subclass__ = __init_subclass__
-
- arg.__deprecated__ = __new__.__deprecated__ = msg
- __init_subclass__.__deprecated__ = msg
- return arg
- elif callable(arg):
- import functools
- import inspect
-
- @functools.wraps(arg)
- def wrapper(*args, **kwargs):
- warn(msg, category=category, stacklevel=stacklevel + 1)
- return arg(*args, **kwargs)
-
- if inspect.iscoroutinefunction(arg):
- wrapper = inspect.markcoroutinefunction(wrapper)
-
- arg.__deprecated__ = wrapper.__deprecated__ = msg
- return wrapper
- else:
- raise TypeError(
- "@deprecated decorator with non-None category must be applied to "
- f"a class or callable, not {arg!r}"
- )
-
-
-_DEPRECATED_MSG = "{name!r} is deprecated and slated for removal in Python {remove}"
-
-def _deprecated(name, message=_DEPRECATED_MSG, *, remove, _version=sys.version_info):
- """Warn that *name* is deprecated or should be removed.
-
- RuntimeError is raised if *remove* specifies a major/minor tuple older than
- the current Python version or the same version but past the alpha.
-
- The *message* argument is formatted with *name* and *remove* as a Python
- version tuple (e.g. (3, 11)).
-
- """
- remove_formatted = f"{remove[0]}.{remove[1]}"
- if (_version[:2] > remove) or (_version[:2] == remove and _version[3] != "alpha"):
- msg = f"{name!r} was slated for removal after Python {remove_formatted} alpha"
- raise RuntimeError(msg)
- else:
- msg = message.format(name=name, remove=remove_formatted)
- warn(msg, DeprecationWarning, stacklevel=3)
-
-
-# Private utility function called by _PyErr_WarnUnawaitedCoroutine
-def _warn_unawaited_coroutine(coro):
- msg_lines = [
- f"coroutine '{coro.__qualname__}' was never awaited\n"
- ]
- if coro.cr_origin is not None:
- import linecache, traceback
- def extract():
- for filename, lineno, funcname in reversed(coro.cr_origin):
- line = linecache.getline(filename, lineno)
- yield (filename, lineno, funcname, line)
- msg_lines.append("Coroutine created at (most recent call last)\n")
- msg_lines += traceback.format_list(list(extract()))
- msg = "".join(msg_lines).rstrip("\n")
- # Passing source= here means that if the user happens to have tracemalloc
- # enabled and tracking where the coroutine was created, the warning will
- # contain that traceback. This does mean that if they have *both*
- # coroutine origin tracking *and* tracemalloc enabled, they'll get two
- # partially-redundant tracebacks. If we wanted to be clever we could
- # probably detect this case and avoid it, but for now we don't bother.
- warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
-
-
-# filters contains a sequence of filter 5-tuples
-# The components of the 5-tuple are:
-# - an action: error, ignore, always, all, default, module, or once
-# - a compiled regex that must match the warning message
-# - a class representing the warning category
-# - a compiled regex that must match the module that is being warned
-# - a line number for the line being warning, or 0 to mean any line
-# If either if the compiled regexs are None, match anything.
try:
- from _warnings import (filters, _defaultaction, _onceregistry,
- warn, warn_explicit,
- _filters_mutated_lock_held,
- _acquire_lock, _release_lock,
+ # Try to use the C extension, this will replace some parts of the
+ # _py_warnings implementation imported above.
+ from _warnings import (
+ _acquire_lock,
+ _defaultaction as defaultaction,
+ _filters_mutated_lock_held,
+ _onceregistry as onceregistry,
+ _release_lock,
+ _warnings_context,
+ filters,
+ warn,
+ warn_explicit,
)
- defaultaction = _defaultaction
- onceregistry = _onceregistry
+
_warnings_defaults = True
class _Lock:
_release_lock()
_lock = _Lock()
-
except ImportError:
- filters = []
- defaultaction = "default"
- onceregistry = {}
-
- import _thread
-
- _lock = _thread.RLock()
-
- _filters_version = 1
-
- def _filters_mutated_lock_held():
- global _filters_version
- _filters_version += 1
-
_warnings_defaults = False
# Module initialization
+_set_module(sys.modules[__name__])
_processoptions(sys.warnoptions)
if not _warnings_defaults:
- # Several warning categories are ignored by default in regular builds
- if not hasattr(sys, 'gettotalrefcount'):
- filterwarnings("default", category=DeprecationWarning,
- module="__main__", append=1)
- simplefilter("ignore", category=DeprecationWarning, append=1)
- simplefilter("ignore", category=PendingDeprecationWarning, append=1)
- simplefilter("ignore", category=ImportWarning, append=1)
- simplefilter("ignore", category=ResourceWarning, append=1)
+ _setup_defaults()
del _warnings_defaults
+del _setup_defaults
# Python
PYTHON_OBJS= \
+ Python/_contextvars.o \
Python/_warnings.o \
Python/Python-ast.o \
Python/Python-tokenize.o \
--- /dev/null
+Add the :data:`sys.flags.thread_inherit_context` flag.
+
+* This flag is set to true by default on the free-threaded build
+ and false otherwise. If the flag is true, starting a new thread using
+ :class:`threading.Thread` will, by default, use a copy of the
+ :class:`contextvars.Context` from the caller of
+ :meth:`threading.Thread.start` rather than using an empty context.
+
+* Add the :option:`-X thread_inherit_context <-X>` command-line option and
+ :envvar:`PYTHON_THREAD_INHERIT_CONTEXT` environment variable, which set the
+ :data:`~sys.flags.thread_inherit_context` flag.
+
+* Add the ``context`` keyword parameter to :class:`~threading.Thread`. It can
+ be used to explicitly pass a context value to be used by a new thread.
+
+* Make the ``_contextvars`` module built-in.
--- /dev/null
+Make :class:`warnings.catch_warnings` use a context variable for holding
+the warning filtering state if the :data:`sys.flags.context_aware_warnings`
+flag is set to true. This makes using the context manager thread-safe in
+multi-threaded programs. The flag is true by default in free-threaded builds
+and is otherwise false. The value of the flag can be overridden by the
+the :option:`-X context_aware_warnings <-X>` command-line option or by the
+:envvar:`PYTHON_CONTEXT_AWARE_WARNINGS` environment variable.
#_asyncio _asynciomodule.c
#_bisect _bisectmodule.c
-#_contextvars _contextvarsmodule.c
#_csv _csv.c
#_datetime _datetimemodule.c
#_decimal _decimal/_decimal.c
@MODULE_ARRAY_TRUE@array arraymodule.c
@MODULE__ASYNCIO_TRUE@_asyncio _asynciomodule.c
@MODULE__BISECT_TRUE@_bisect _bisectmodule.c
-@MODULE__CONTEXTVARS_TRUE@_contextvars _contextvarsmodule.c
@MODULE__CSV_TRUE@_csv _csv.c
@MODULE__HEAPQ_TRUE@_heapq _heapqmodule.c
@MODULE__JSON_TRUE@_json _json.c
extern PyObject* PyInit_gc(void);
extern PyObject* PyInit__ast(void);
extern PyObject* PyInit__tokenize(void);
+extern PyObject* PyInit__contextvars(void);
extern PyObject* _PyWarnings_Init(void);
extern PyObject* PyInit__string(void);
/* This lives in gcmodule.c */
{"gc", PyInit_gc},
+ /* This lives in Python/_contextvars.c */
+ {"_contextvars", PyInit__contextvars},
+
/* This lives in _warnings.c */
{"_warnings", _PyWarnings_Init},
</ClCompile>
<ClCompile Include="..\Modules\_codecsmodule.c" />
<ClCompile Include="..\Modules\_collectionsmodule.c" />
- <ClCompile Include="..\Modules\_contextvarsmodule.c" />
<ClCompile Include="..\Modules\_csv.c" />
<ClCompile Include="..\Modules\_functoolsmodule.c" />
<ClCompile Include="..\Modules\_hacl\Hacl_Hash_MD5.c" />
<ClCompile Include="..\PC\config.c" />
<ClCompile Include="..\PC\msvcrtmodule.c" />
<ClCompile Include="..\Python\pyhash.c" />
+ <ClCompile Include="..\Python\_contextvars.c" />
<ClCompile Include="..\Python\_warnings.c" />
<ClCompile Include="..\Python\asdl.c" />
<ClCompile Include="..\Python\assemble.c" />
<ClCompile Include="..\PC\msvcrtmodule.c">
<Filter>PC</Filter>
</ClCompile>
+ <ClCompile Include="..\Python\_contextvars.c">
+ <Filter>Python</Filter>
+ </ClCompile>
<ClCompile Include="..\Python\_warnings.c">
<Filter>Python</Filter>
</ClCompile>
<ClCompile Include="..\Objects\odictobject.c">
<Filter>Objects</Filter>
</ClCompile>
- <ClCompile Include="..\Modules\_contextvarsmodule.c">
- <Filter>Modules</Filter>
- </ClCompile>
<ClCompile Include="$(zlibDir)\adler32.c">
<Filter>Modules\zlib</Filter>
</ClCompile>
#include "Python.h"
-#include "clinic/_contextvarsmodule.c.h"
+#include "clinic/_contextvars.c.h"
/*[clinic input]
module _contextvars
Py_CLEAR(st->filters);
Py_CLEAR(st->once_registry);
Py_CLEAR(st->default_action);
+ Py_CLEAR(st->context);
}
#ifndef Py_DEBUG
}
}
+ if (st->context == NULL) {
+ st->context = PyContextVar_New("_warnings_context", NULL);
+ if (st->context == NULL) {
+ return -1;
+ }
+ }
+
st->filters_version = 0;
return 0;
}
return PyMutex_IsLocked(&st->lock.mutex);
}
+static PyObject *
+get_warnings_context(PyInterpreterState *interp)
+{
+ WarningsState *st = warnings_get_state(interp);
+ assert(PyContextVar_CheckExact(st->context));
+ PyObject *ctx;
+ if (PyContextVar_Get(st->context, NULL, &ctx) < 0) {
+ return NULL;
+ }
+ if (ctx == NULL) {
+ Py_RETURN_NONE;
+ }
+ return ctx;
+}
+
+static PyObject *
+get_warnings_context_filters(PyInterpreterState *interp)
+{
+ PyObject *ctx = get_warnings_context(interp);
+ if (ctx == NULL) {
+ return NULL;
+ }
+ if (ctx == Py_None) {
+ Py_RETURN_NONE;
+ }
+ PyObject *context_filters = PyObject_GetAttr(ctx, &_Py_ID(_filters));
+ Py_DECREF(ctx);
+ if (context_filters == NULL) {
+ return NULL;
+ }
+ if (!PyList_Check(context_filters)) {
+ PyErr_SetString(PyExc_ValueError,
+ "_filters of warnings._warnings_context must be a list");
+ Py_DECREF(context_filters);
+ return NULL;
+ }
+ return context_filters;
+}
+
+// Returns a borrowed reference to the list.
+static PyObject *
+get_warnings_filters(PyInterpreterState *interp)
+{
+ WarningsState *st = warnings_get_state(interp);
+ PyObject *warnings_filters = GET_WARNINGS_ATTR(interp, filters, 0);
+ if (warnings_filters == NULL) {
+ if (PyErr_Occurred())
+ return NULL;
+ }
+ else {
+ Py_SETREF(st->filters, warnings_filters);
+ }
+
+ PyObject *filters = st->filters;
+ if (filters == NULL || !PyList_Check(filters)) {
+ PyErr_SetString(PyExc_ValueError,
+ MODULE_NAME ".filters must be a list");
+ return NULL;
+ }
+ return filters;
+}
+
/*[clinic input]
_acquire_lock as warnings_acquire_lock
return default_action;
}
-
-/* The item is a new reference. */
-static PyObject*
-get_filter(PyInterpreterState *interp, PyObject *category,
- PyObject *text, Py_ssize_t lineno,
- PyObject *module, PyObject **item)
-{
- WarningsState *st = warnings_get_state(interp);
- assert(st != NULL);
-
- assert(warnings_lock_held(st));
-
- PyObject *warnings_filters = GET_WARNINGS_ATTR(interp, filters, 0);
- if (warnings_filters == NULL) {
- if (PyErr_Occurred())
- return NULL;
- }
- else {
- Py_SETREF(st->filters, warnings_filters);
- }
-
- PyObject *filters = st->filters;
- if (filters == NULL || !PyList_Check(filters)) {
- PyErr_SetString(PyExc_ValueError,
- MODULE_NAME ".filters must be a list");
- return NULL;
- }
-
- /* WarningsState.filters could change while we are iterating over it. */
+/* Search filters list of match, returns false on error. If no match
+ * then 'matched_action' is NULL. */
+static bool
+filter_search(PyInterpreterState *interp, PyObject *category,
+ PyObject *text, Py_ssize_t lineno,
+ PyObject *module, char *list_name, PyObject *filters,
+ PyObject **item, PyObject **matched_action) {
+ bool result = true;
+ *matched_action = NULL;
+ /* Avoid the filters list changing while we iterate over it. */
+ Py_BEGIN_CRITICAL_SECTION(filters);
for (Py_ssize_t i = 0; i < PyList_GET_SIZE(filters); i++) {
PyObject *tmp_item, *action, *msg, *cat, *mod, *ln_obj;
Py_ssize_t ln;
tmp_item = PyList_GET_ITEM(filters, i);
if (!PyTuple_Check(tmp_item) || PyTuple_GET_SIZE(tmp_item) != 5) {
PyErr_Format(PyExc_ValueError,
- MODULE_NAME ".filters item %zd isn't a 5-tuple", i);
- return NULL;
+ "warnings.%s item %zd isn't a 5-tuple", list_name, i);
+ result = false;
+ break;
}
/* Python code: action, msg, cat, mod, ln = item */
"action must be a string, not '%.200s'",
Py_TYPE(action)->tp_name);
Py_DECREF(tmp_item);
- return NULL;
+ result = false;
+ break;
}
good_msg = check_matched(interp, msg, text);
if (good_msg == -1) {
Py_DECREF(tmp_item);
- return NULL;
+ result = false;
+ break;
}
good_mod = check_matched(interp, mod, module);
if (good_mod == -1) {
Py_DECREF(tmp_item);
- return NULL;
+ result = false;
+ break;
}
is_subclass = PyObject_IsSubclass(category, cat);
if (is_subclass == -1) {
Py_DECREF(tmp_item);
- return NULL;
+ result = false;
+ break;
}
ln = PyLong_AsSsize_t(ln_obj);
if (ln == -1 && PyErr_Occurred()) {
Py_DECREF(tmp_item);
- return NULL;
+ result = false;
+ break;
}
if (good_msg && is_subclass && good_mod && (ln == 0 || lineno == ln)) {
*item = tmp_item;
- return action;
+ *matched_action = action;
+ result = true;
+ break;
}
Py_DECREF(tmp_item);
}
+ Py_END_CRITICAL_SECTION();
+ return result;
+}
+
+/* The item is a new reference. */
+static PyObject*
+get_filter(PyInterpreterState *interp, PyObject *category,
+ PyObject *text, Py_ssize_t lineno,
+ PyObject *module, PyObject **item)
+{
+#ifdef Py_DEBUG
+ WarningsState *st = warnings_get_state(interp);
+ assert(st != NULL);
+ assert(warnings_lock_held(st));
+#endif
+
+ /* check _warning_context _filters list */
+ PyObject *context_filters = get_warnings_context_filters(interp);
+ if (context_filters == NULL) {
+ return NULL;
+ }
+ bool use_global_filters = false;
+ if (context_filters == Py_None) {
+ use_global_filters = true;
+ } else {
+ PyObject *context_action = NULL;
+ if (!filter_search(interp, category, text, lineno, module, "_warnings_context _filters",
+ context_filters, item, &context_action)) {
+ Py_DECREF(context_filters);
+ return NULL;
+ }
+ Py_DECREF(context_filters);
+ if (context_action != NULL) {
+ return context_action;
+ }
+ }
+
+ PyObject *action;
+
+ if (use_global_filters) {
+ /* check warnings.filters list */
+ PyObject *filters = get_warnings_filters(interp);
+ if (filters == NULL) {
+ return NULL;
+ }
+ if (!filter_search(interp, category, text, lineno, module, "filters",
+ filters, item, &action)) {
+ return NULL;
+ }
+ if (action != NULL) {
+ return action;
+ }
+ }
- PyObject *action = get_default_action(interp);
+ action = get_default_action(interp);
if (action != NULL) {
*item = Py_NewRef(Py_None);
return action;
if (PyModule_AddObjectRef(module, "_defaultaction", st->default_action) < 0) {
return -1;
}
+ if (PyModule_AddObjectRef(module, "_warnings_context", st->context) < 0) {
+ return -1;
+ }
return 0;
}
SPEC(filesystem_errors, WSTR, READ_ONLY, NO_SYS),
SPEC(hash_seed, ULONG, READ_ONLY, NO_SYS),
SPEC(home, WSTR_OPT, READ_ONLY, NO_SYS),
+ SPEC(thread_inherit_context, INT, READ_ONLY, NO_SYS),
+ SPEC(context_aware_warnings, INT, READ_ONLY, NO_SYS),
SPEC(import_time, BOOL, READ_ONLY, NO_SYS),
SPEC(install_signal_handlers, BOOL, READ_ONLY, NO_SYS),
SPEC(isolated, BOOL, READ_ONLY, NO_SYS), // sys.flags.isolated
PYTHON_TLBC\n"
#endif
"\
+-X thread_inherit_context=[0|1]: enable (1) or disable (0) threads inheriting\n\
+ context vars by default; enabled by default in the free-threaded\n\
+ build and disabled otherwise; also PYTHON_THREAD_INHERIT_CONTEXT\n\
+-X context_aware_warnings=[0|1]: if true (1) then the warnings module will\n\
+ use a context variables; if false (0) then the warnings module will\n\
+ use module globals, which is not concurrent-safe; set to true for\n\
+ free-threaded builds and false otherwise; also\n\
+ PYTHON_CONTEXT_AWARE_WARNINGS\n\
-X tracemalloc[=N]: trace Python memory allocations; N sets a traceback limit\n \
of N frames (default: 1); also PYTHONTRACEMALLOC=N\n\
-X utf8[=0|1]: enable (1) or disable (0) UTF-8 mode; also PYTHONUTF8\n\
#ifdef Py_GIL_DISABLED
"PYTHON_TLBC : when set to 0, disables thread-local bytecode (-X tlbc)\n"
#endif
+"PYTHON_THREAD_INHERIT_CONTEXT: if true (1), threads inherit context vars\n"
+" (-X thread_inherit_context)\n"
+"PYTHON_CONTEXT_AWARE_WARNINGS: if true (1), enable thread-safe warnings module\n"
+" behaviour (-X context_aware_warnings)\n"
"PYTHONTRACEMALLOC: trace Python memory allocations (-X tracemalloc)\n"
"PYTHONUNBUFFERED: disable stdout/stderr buffering (-u)\n"
"PYTHONUTF8 : control the UTF-8 mode (-X utf8)\n"
assert(config->cpu_count != 0);
// config->use_frozen_modules is initialized later
// by _PyConfig_InitImportConfig().
+ assert(config->thread_inherit_context >= 0);
+ assert(config->context_aware_warnings >= 0);
#ifdef __APPLE__
assert(config->use_system_logger >= 0);
#endif
config->_is_python_build = 0;
config->code_debug_ranges = 1;
config->cpu_count = -1;
+#ifdef Py_GIL_DISABLED
+ config->thread_inherit_context = 1;
+ config->context_aware_warnings = 1;
+#else
+ config->thread_inherit_context = 0;
+ config->context_aware_warnings = 0;
+#endif
#ifdef __APPLE__
config->use_system_logger = USE_SYSTEM_LOGGER_DEFAULT;
#endif
#ifdef MS_WINDOWS
config->legacy_windows_stdio = 0;
#endif
+#ifdef Py_GIL_DISABLED
+ config->thread_inherit_context = 1;
+ config->context_aware_warnings = 1;
+#else
+ config->thread_inherit_context = 0;
+ config->context_aware_warnings = 0;
+#endif
#ifdef __APPLE__
config->use_system_logger = USE_SYSTEM_LOGGER_DEFAULT;
#endif
config->int_max_str_digits = _PY_LONG_DEFAULT_MAX_STR_DIGITS;
config->safe_path = 1;
config->pathconfig_warnings = 0;
+#ifdef Py_GIL_DISABLED
+ config->thread_inherit_context = 1;
+#else
+ config->thread_inherit_context = 0;
+#endif
#ifdef MS_WINDOWS
config->legacy_windows_stdio = 0;
#endif
"n must be greater than 0");
}
+static PyStatus
+config_init_thread_inherit_context(PyConfig *config)
+{
+ const char *env = config_get_env(config, "PYTHON_THREAD_INHERIT_CONTEXT");
+ if (env) {
+ int enabled;
+ if (_Py_str_to_int(env, &enabled) < 0 || (enabled < 0) || (enabled > 1)) {
+ return _PyStatus_ERR(
+ "PYTHON_THREAD_INHERIT_CONTEXT=N: N is missing or invalid");
+ }
+ config->thread_inherit_context = enabled;
+ }
+
+ const wchar_t *xoption = config_get_xoption(config, L"thread_inherit_context");
+ if (xoption) {
+ int enabled;
+ const wchar_t *sep = wcschr(xoption, L'=');
+ if (!sep || (config_wstr_to_int(sep + 1, &enabled) < 0) || (enabled < 0) || (enabled > 1)) {
+ return _PyStatus_ERR(
+ "-X thread_inherit_context=n: n is missing or invalid");
+ }
+ config->thread_inherit_context = enabled;
+ }
+ return _PyStatus_OK();
+}
+
+static PyStatus
+config_init_context_aware_warnings(PyConfig *config)
+{
+ const char *env = config_get_env(config, "PYTHON_CONTEXT_AWARE_WARNINGS");
+ if (env) {
+ int enabled;
+ if (_Py_str_to_int(env, &enabled) < 0 || (enabled < 0) || (enabled > 1)) {
+ return _PyStatus_ERR(
+ "PYTHON_CONTEXT_AWARE_WARNINGS=N: N is missing or invalid");
+ }
+ config->context_aware_warnings = enabled;
+ }
+
+ const wchar_t *xoption = config_get_xoption(config, L"context_aware_warnings");
+ if (xoption) {
+ int enabled;
+ const wchar_t *sep = wcschr(xoption, L'=');
+ if (!sep || (config_wstr_to_int(sep + 1, &enabled) < 0) || (enabled < 0) || (enabled > 1)) {
+ return _PyStatus_ERR(
+ "-X context_aware_warnings=n: n is missing or invalid");
+ }
+ config->context_aware_warnings = enabled;
+ }
+ return _PyStatus_OK();
+}
+
static PyStatus
config_init_tlbc(PyConfig *config)
{
}
#endif
+ status = config_init_thread_inherit_context(config);
+ if (_PyStatus_EXCEPTION(status)) {
+ return status;
+ }
+
+ status = config_init_context_aware_warnings(config);
+ if (_PyStatus_EXCEPTION(status)) {
+ return status;
+ }
+
status = config_init_tlbc(config);
if (_PyStatus_EXCEPTION(status)) {
return status;
"_posixshmem",
"_posixsubprocess",
"_py_abc",
+"_py_warnings",
"_pydatetime",
"_pydecimal",
"_pyio",
{"safe_path", "-P"},
{"int_max_str_digits", "-X int_max_str_digits"},
{"gil", "-X gil"},
+ {"thread_inherit_context", "-X thread_inherit_context"},
+ {"context_aware_warnings", "-X context_aware_warnings"},
{0}
};
#else
SetFlagObj(PyLong_FromLong(1));
#endif
+ SetFlag(config->thread_inherit_context);
+ SetFlag(config->context_aware_warnings);
#undef SetFlagObj
#undef SetFlag
return 0;
MODULE__HEAPQ_TRUE
MODULE__CSV_FALSE
MODULE__CSV_TRUE
-MODULE__CONTEXTVARS_FALSE
-MODULE__CONTEXTVARS_TRUE
MODULE__BISECT_FALSE
MODULE__BISECT_TRUE
MODULE__ASYNCIO_FALSE
-fi
-
-
- if test "$py_cv_module__contextvars" != "n/a"
-then :
- py_cv_module__contextvars=yes
-fi
- if test "$py_cv_module__contextvars" = yes; then
- MODULE__CONTEXTVARS_TRUE=
- MODULE__CONTEXTVARS_FALSE='#'
-else
- MODULE__CONTEXTVARS_TRUE='#'
- MODULE__CONTEXTVARS_FALSE=
-fi
-
- as_fn_append MODULE_BLOCK "MODULE__CONTEXTVARS_STATE=$py_cv_module__contextvars$as_nl"
- if test "x$py_cv_module__contextvars" = xyes
-then :
-
-
-
-
fi
as_fn_error $? "conditional \"MODULE__BISECT\" was never defined.
Usually this means the macro was only invoked conditionally." "$LINENO" 5
fi
-if test -z "${MODULE__CONTEXTVARS_TRUE}" && test -z "${MODULE__CONTEXTVARS_FALSE}"; then
- as_fn_error $? "conditional \"MODULE__CONTEXTVARS\" was never defined.
-Usually this means the macro was only invoked conditionally." "$LINENO" 5
-fi
if test -z "${MODULE__CSV_TRUE}" && test -z "${MODULE__CSV_FALSE}"; then
as_fn_error $? "conditional \"MODULE__CSV\" was never defined.
Usually this means the macro was only invoked conditionally." "$LINENO" 5
PY_STDLIB_MOD_SIMPLE([array])
PY_STDLIB_MOD_SIMPLE([_asyncio])
PY_STDLIB_MOD_SIMPLE([_bisect])
-PY_STDLIB_MOD_SIMPLE([_contextvars])
PY_STDLIB_MOD_SIMPLE([_csv])
PY_STDLIB_MOD_SIMPLE([_heapq])
PY_STDLIB_MOD_SIMPLE([_json])