from sqlalchemy.sql.expression import Select
+def _indent(text, indent):
+ return "\n".join(indent + line for line in text.split("\n"))
+
+
def before_execute_hook(conn, clauseelement, multiparams, params):
if isinstance(clauseelement, Select):
lint(clauseelement)
# type: (Select) -> None
froms, start_with = find_unmatching_froms(query)
if froms:
- util.warn(
- 'for stmt %s FROM elements %s are not joined up to FROM element "%r"'
- % (
- id(query), # defeat the warnings filter
- ", ".join('"%r"' % f for f in froms),
- start_with,
- )
+ template = '''Query\n{query}\nhas FROM elements:\n{froms}\nthat are not joined up to FROM element\n{start}'''
+ indent = ' '
+ message = template.format(
+ query=_indent(str(query), indent),
+ froms=_indent('\n'.join(str(from_) for from_ in froms), indent + '* '),
+ start=_indent(str(start_with), indent + '* '),
)
+ util.warn(message)
def lint(query):
from sqlalchemy.ext.linter import find_unmatching_froms
from sqlalchemy.testing import fixtures, expect_warnings
from sqlalchemy.testing.schema import Table, Column
+from sqlalchemy.testing.mock import patch
-class TestFinder(fixtures.TablesTest):
+class TestFindUnmatchingFroms(fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
Table("table_a", metadata, Column("col_a", Integer, primary_key=True))
def test_integration(self):
query = (
- select([self.a])
+ select([self.a.c.col_a])
.where(self.b.c.col_b == 5)
)
- # TODO:
- # - make it a unit by mocking or spying "find_unmatching_froms"
- # - Make error string proper
- with expect_warnings(
- r"for stmt .* FROM elements .*table_.*col_.* "
- r"are not joined up to FROM element .*table_.*col_.*"
- ):
- with testing.db.connect() as conn:
- conn.execute(query)
+
+ def deterministic_find_unmatching_froms(query):
+ return find_unmatching_froms(query, start_with=self.a)
+
+ with patch(
+ 'sqlalchemy.ext.linter.find_unmatching_froms',
+ autospec=True,
+ side_effect=deterministic_find_unmatching_froms,
+ ) as m:
+ with expect_warnings(
+ r"Query.*Select table_a\.col_a.*has FROM elements.*table_b.*"
+ r"that are not joined up to FROM element.*table_a.*"
+ ):
+ with testing.db.connect() as conn:
+ conn.execute(query)
+ m.assert_called_once_with(query)
def teardown(self):
event.remove(testing.db, 'before_execute', linter.before_execute_hook)