import textwrap
import threading
import types
+from types import CodeType
from typing import Any
from typing import Callable
from typing import cast
_warnings_warn(msg, exc.SAWarning)
+_warning_tags: Dict[CodeType, Tuple[str, Type[Warning]]] = {}
+
+
+def tag_method_for_warnings(
+ message: str, category: Type[Warning]
+) -> Callable[[_F], _F]:
+ def go(fn):
+ _warning_tags[fn.__code__] = (message, category)
+ return fn
+
+ return go
+
+
_not_sa_pattern = re.compile(r"^(?:sqlalchemy\.(?!testing)|alembic\.)")
# ok, but don't crash
stacklevel = 0
else:
- # using __name__ here requires that we have __name__ in the
- # __globals__ of the decorated string functions we make also.
- # we generate this using {"__name__": fn.__module__}
- while frame is not None and re.match(
- _not_sa_pattern, frame.f_globals.get("__name__", "")
- ):
+ stacklevel_found = warning_tag_found = False
+ while frame is not None:
+ # using __name__ here requires that we have __name__ in the
+ # __globals__ of the decorated string functions we make also.
+ # we generate this using {"__name__": fn.__module__}
+ if not stacklevel_found and not re.match(
+ _not_sa_pattern, frame.f_globals.get("__name__", "")
+ ):
+ # stop incrementing stack level if an out-of-SQLA line
+ # were found.
+ stacklevel_found = True
+
+ # however, for the warning tag thing, we have to keep
+ # scanning up the whole traceback
+
+ if frame.f_code in _warning_tags:
+ warning_tag_found = True
+ (_prefix, _category) = _warning_tags[frame.f_code]
+ category = category or _category
+ message = f"{message} ({_prefix})"
+
frame = frame.f_back # type: ignore[assignment]
- stacklevel += 1
+
+ if not stacklevel_found:
+ stacklevel += 1
+ elif stacklevel_found and warning_tag_found:
+ break
if category is not None:
warnings.warn(message, category, stacklevel=stacklevel + 1)
+import re
+
from sqlalchemy import Column
+from sqlalchemy import event
+from sqlalchemy import ForeignKey
from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import aliased
from sqlalchemy.orm import clear_mappers
+from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
from sqlalchemy.orm import synonym
from sqlalchemy.orm import util as orm_util
from .inheritance import _poly_fixtures
+class ContextualWarningsTest(fixtures.TestBase):
+ """
+ Test for #7305
+
+ """
+
+ @testing.fixture
+ def plain_fixture(cls, decl_base):
+ class Foo(decl_base):
+ __tablename__ = "foo"
+ id = Column(Integer, primary_key=True)
+
+ decl_base.metadata.create_all(testing.db)
+ return Foo
+
+ @testing.fixture
+ def overlap_fixture(cls, decl_base):
+ class Foo(decl_base):
+ __tablename__ = "foo"
+ id = Column(Integer, primary_key=True)
+ bars = relationship(
+ "Bar",
+ primaryjoin="Foo.id==Bar.foo_id",
+ )
+
+ class Bar(decl_base):
+ __tablename__ = "bar"
+ id = Column(Integer, primary_key=True)
+ foo_id = Column(Integer, ForeignKey("foo.id"))
+ foos = relationship(
+ "Foo",
+ primaryjoin="Bar.foo_id==Foo.id",
+ )
+
+ return Foo, Bar
+
+ def test_configure_mappers_explicit(self, overlap_fixture, decl_base):
+ with expect_warnings(
+ re.escape(
+ "relationship 'Bar.foos' will copy column foo.id to column "
+ "bar.foo_id, which conflicts with relationship(s): 'Foo.bars' "
+ "(copies foo.id to bar.foo_id). "
+ ),
+ raise_on_any_unexpected=True,
+ ):
+ decl_base.registry.configure()
+
+ def test_configure_mappers_implicit_aliased(self, overlap_fixture):
+ Foo, Bar = overlap_fixture
+ with expect_warnings(
+ re.escape(
+ "relationship 'Bar.foos' will copy column foo.id "
+ "to column bar.foo_id, which conflicts with"
+ )
+ + ".*"
+ + re.escape(
+ "(This warning originated from the `configure_mappers()` "
+ "process, which was "
+ "invoked automatically in response to a user-initiated "
+ "operation.)"
+ ),
+ raise_on_any_unexpected=True,
+ ):
+ FooAlias = aliased(Foo)
+ assert hasattr(FooAlias, "bars")
+
+ def test_configure_mappers_implicit_instantiate(self, overlap_fixture):
+ Foo, Bar = overlap_fixture
+ with expect_warnings(
+ re.escape(
+ "relationship 'Bar.foos' will copy column foo.id "
+ "to column bar.foo_id, which conflicts with"
+ )
+ + ".*"
+ + re.escape(
+ "(This warning originated from the `configure_mappers()` "
+ "process, which was "
+ "invoked automatically in response to a user-initiated "
+ "operation.)"
+ ),
+ raise_on_any_unexpected=True,
+ ):
+ foo = Foo()
+ assert hasattr(foo, "bars")
+
+ def test_autoflush_implicit(self, plain_fixture):
+ Foo = plain_fixture
+
+ sess = fixture_session()
+
+ @event.listens_for(Foo, "before_insert")
+ def emit_a_warning(mapper, connection, state):
+ sess.add(Foo())
+
+ sess.add(Foo())
+
+ with expect_warnings(
+ re.escape(
+ "Usage of the 'Session.add()' operation is not "
+ "currently supported within the execution stage of the flush "
+ "process. Results may not be consistent. Consider using "
+ "alternative event listeners or connection-level operations "
+ "instead."
+ )
+ + ".*"
+ + re.escape(
+ "(This warning originated from the Session 'autoflush' "
+ "process, which was invoked automatically in response to a "
+ "user-initiated operation.)"
+ ),
+ raise_on_any_unexpected=True,
+ ):
+ sess.execute(select(Foo))
+
+
class AliasedClassTest(fixtures.MappedTest, AssertsCompiledSQL):
__dialect__ = "default"