return rule
+def warns_if(predicate, expression, assert_):
+ rule = compound()
+ pred = _as_predicate(predicate)
+ rule.warns[pred] = (expression, assert_)
+ return rule
+
+
class compound:
def __init__(self):
self.fails = set()
self.skips = set()
+ self.warns = {}
def __add__(self, other):
return self.add(other)
copy = compound()
copy.fails.update(self.fails)
copy.skips.update(self.skips)
+ copy.warns.update(self.warns)
for other in others:
copy.fails.update(other.fails)
copy.skips.update(other.skips)
+ copy.warns.update(other.warns)
return copy
def not_(self):
copy = compound()
copy.fails.update(NotPredicate(fail) for fail in self.fails)
copy.skips.update(NotPredicate(skip) for skip in self.skips)
+ copy.warns.update(
+ {
+ NotPredicate(warn): element
+ for warn, element in self.warns.items()
+ }
+ )
return copy
@property
else:
return True
+ def matching_warnings(self, config):
+ return [
+ message
+ for predicate, (message, assert_) in self.warns.items()
+ if predicate(config)
+ ]
+
def matching_config_reasons(self, config):
return [
predicate._as_string(config)
def _extend(self, other):
self.skips.update(other.skips)
self.fails.update(other.fails)
+ self.warns.update(other.warns)
def __call__(self, fn):
if hasattr(fn, "_sa_exclusion_extend"):
)
config.skip_test(msg)
+ if self.warns:
+ from .assertions import expect_warnings
+
+ @contextlib.contextmanager
+ def _expect_warnings():
+ with contextlib.ExitStack() as stack:
+ for expression, assert_ in self.warns.values():
+ stack.enter_context(
+ expect_warnings(expression, assert_=assert_)
+ )
+ yield
+
+ ctx = _expect_warnings()
+ else:
+ ctx = contextlib.nullcontext()
+
try:
- return_value = fn(*args, **kw)
+ with ctx:
+ return_value = fn(*args, **kw)
except Exception as ex:
self._expect_failure(cfg, ex, name=fn.__name__)
else:
_current_class = None
+_current_warning_context = None
+
def pytest_runtest_setup(item):
from sqlalchemy.testing import asyncio
# databases, so we run this outside of the pytest fixture system altogether
# and ensure asyncio greenlet if any engines are async
- global _current_class
+ global _current_class, _current_warning_context
if isinstance(item, pytest.Function) and _current_class is None:
asyncio._maybe_async_provisioning(
)
_current_class = item.getparent(pytest.Class)
+ if hasattr(_current_class.cls, "__warnings__"):
+ import warnings
+
+ _current_warning_context = warnings.catch_warnings()
+ _current_warning_context.__enter__()
+ for warning_message in _current_class.cls.__warnings__:
+ warnings.filterwarnings("ignore", warning_message)
+
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_teardown(item, nextitem):
# pytest_runtest_setup since the class has not yet been setup at that
# time.
# See https://github.com/pytest-dev/pytest/issues/9343
- global _current_class, _current_report
+
+ global _current_class, _current_report, _current_warning_context
if _current_class is not None and (
# last test or a new class
nextitem is None
or nextitem.getparent(pytest.Class) is not _current_class
):
+
+ if _current_warning_context is not None:
+ _current_warning_context.__exit__(None, None, None)
+ _current_warning_context = None
+
_current_class = None
try:
def mark_base_test_class(self):
return pytest.mark.usefixtures(
- "setup_class_methods", "setup_test_methods"
+ "setup_class_methods",
+ "setup_test_methods",
)
_combination_id_fns = {