From 135070f75e0216eef1a85e7ad911b20ed46edc3f Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Mon, 13 Jul 2020 10:49:57 -0400 Subject: [PATCH] test single and double quote inspection scenarios Applied a sweep through all included dialects to ensure names that contain single or double quotes are properly escaped when querying system tables, for all :class:`.Inspector` methods that accept object names as an argument (e.g. table names, view names, etc). SQLite and MSSQL contained two quoting issues that were repaired. Fixes: #5456 Change-Id: I3bc98806f5166f3d82275650079ff561446f2aef (cherry picked from commit 4351f7183c81bdb64a9a3ba47aa4b9891f42ffdb) --- doc/build/changelog/unreleased_13/5456.rst | 9 ++ lib/sqlalchemy/dialects/mssql/base.py | 7 +- lib/sqlalchemy/dialects/sqlite/base.py | 31 ++-- lib/sqlalchemy/testing/requirements.py | 7 + .../testing/suite/test_reflection.py | 139 ++++++++++++++++++ test/requirements.py | 10 ++ 6 files changed, 184 insertions(+), 19 deletions(-) create mode 100644 doc/build/changelog/unreleased_13/5456.rst diff --git a/doc/build/changelog/unreleased_13/5456.rst b/doc/build/changelog/unreleased_13/5456.rst new file mode 100644 index 0000000000..823f3731b9 --- /dev/null +++ b/doc/build/changelog/unreleased_13/5456.rst @@ -0,0 +1,9 @@ +.. change:: + :tags: bug, reflection, sqlite, mssql + :tickets: 5456 + + Applied a sweep through all included dialects to ensure names that contain + single or double quotes are properly escaped when querying system tables, + for all :class:`.Inspector` methods that accept object names as an argument + (e.g. table names, view names, etc). SQLite and MSSQL contained two + quoting issues that were repaired. diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py index e66a8fd5d0..87959856ff 100644 --- a/lib/sqlalchemy/dialects/mssql/base.py +++ b/lib/sqlalchemy/dialects/mssql/base.py @@ -2738,8 +2738,11 @@ class MSDialect(default.DefaultDialect): colmap[col["name"]] = col # We also run an sp_columns to check for identity columns: cursor = connection.execute( - "sp_columns @table_name = '%s', " - "@table_owner = '%s'" % (tablename, owner) + sql.text( + "sp_columns @table_name = :table_name, " + "@table_owner = :table_owner", + ), + {"table_name": tablename, "table_owner": owner}, ) ic = None while True: diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index 77afe5fb1b..44eb2e1d51 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -1660,27 +1660,26 @@ class SQLiteDialect(default.DefaultDialect): if schema is not None: qschema = self.identifier_preparer.quote_identifier(schema) master = "%s.sqlite_master" % qschema - s = ("SELECT sql FROM %s WHERE name = '%s'" "AND type='view'") % ( + s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % ( master, - view_name, ) - rs = connection.execute(s) + rs = connection.execute(s, (view_name,)) else: try: s = ( "SELECT sql FROM " " (SELECT * FROM sqlite_master UNION ALL " " SELECT * FROM sqlite_temp_master) " - "WHERE name = '%s' " + "WHERE name = ? " "AND type='view'" - ) % view_name - rs = connection.execute(s) + ) + rs = connection.execute(s, (view_name,)) except exc.DBAPIError: s = ( - "SELECT sql FROM sqlite_master WHERE name = '%s' " + "SELECT sql FROM sqlite_master WHERE name = ? " "AND type='view'" - ) % view_name - rs = connection.execute(s) + ) + rs = connection.execute(s, (view_name,)) result = rs.fetchall() if result: @@ -2124,19 +2123,17 @@ class SQLiteDialect(default.DefaultDialect): "SELECT sql FROM " " (SELECT * FROM %(schema)ssqlite_master UNION ALL " " SELECT * FROM %(schema)ssqlite_temp_master) " - "WHERE name = '%(table)s' " - "AND type = 'table'" - % {"schema": schema_expr, "table": table_name} + "WHERE name = ? " + "AND type = 'table'" % {"schema": schema_expr} ) - rs = connection.execute(s) + rs = connection.execute(s, (table_name,)) except exc.DBAPIError: s = ( "SELECT sql FROM %(schema)ssqlite_master " - "WHERE name = '%(table)s' " - "AND type = 'table'" - % {"schema": schema_expr, "table": table_name} + "WHERE name = ? " + "AND type = 'table'" % {"schema": schema_expr} ) - rs = connection.execute(s) + rs = connection.execute(s, (table_name,)) return rs.scalar() def _get_table_pragma(self, connection, pragma, table_name, schema=None): diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index e026b16918..c49c7be52b 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -561,6 +561,13 @@ class SuiteRequirements(Requirements): """ return exclusions.closed() + @property + def symbol_names_w_double_quote(self): + """Target driver can create tables with a name like 'some " table' + + """ + return exclusions.open() + @property def datetime_literals(self): """target dialect supports rendering of a date, time, or datetime as a diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py index ce19c4aedd..2142dd84ba 100644 --- a/lib/sqlalchemy/testing/suite/test_reflection.py +++ b/lib/sqlalchemy/testing/suite/test_reflection.py @@ -78,6 +78,144 @@ class HasTableTest(fixtures.TablesTest): ) +class QuotedNameArgumentTest(fixtures.TablesTest): + run_create_tables = "once" + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "quote ' one", + metadata, + Column("id", Integer), + Column("name", String(50)), + Column("data", String(50)), + Column("related_id", Integer), + sa.PrimaryKeyConstraint("id", name="pk quote ' one"), + sa.Index("ix quote ' one", "name"), + sa.UniqueConstraint("data", name="uq quote' one",), + sa.ForeignKeyConstraint( + ["id"], ["related.id"], name="fk quote ' one" + ), + sa.CheckConstraint("name != 'foo'", name="ck quote ' one"), + comment=r"""quote ' one comment""", + test_needs_fk=True, + ) + + if testing.requires.symbol_names_w_double_quote.enabled: + Table( + 'quote " two', + metadata, + Column("id", Integer), + Column("name", String(50)), + Column("data", String(50)), + Column("related_id", Integer), + sa.PrimaryKeyConstraint("id", name='pk quote " two'), + sa.Index('ix quote " two', "name"), + sa.UniqueConstraint("data", name='uq quote" two',), + sa.ForeignKeyConstraint( + ["id"], ["related.id"], name='fk quote " two' + ), + sa.CheckConstraint("name != 'foo'", name='ck quote " two '), + comment=r"""quote " two comment""", + test_needs_fk=True, + ) + + Table( + "related", + metadata, + Column("id", Integer, primary_key=True), + Column("related", Integer), + test_needs_fk=True, + ) + + if testing.requires.view_column_reflection.enabled: + + if testing.requires.symbol_names_w_double_quote.enabled: + names = [ + "quote ' one", + 'quote " two', + ] + else: + names = [ + "quote ' one", + ] + for name in names: + query = "CREATE VIEW %s AS SELECT * FROM %s" % ( + testing.db.dialect.identifier_preparer.quote( + "view %s" % name + ), + testing.db.dialect.identifier_preparer.quote(name), + ) + + event.listen(metadata, "after_create", DDL(query)) + event.listen( + metadata, + "before_drop", + DDL( + "DROP VIEW %s" + % testing.db.dialect.identifier_preparer.quote( + "view %s" % name + ) + ), + ) + + def quote_fixtures(fn): + return testing.combinations( + ("quote ' one",), + ('quote " two', testing.requires.symbol_names_w_double_quote), + )(fn) + + @quote_fixtures + def test_get_table_options(self, name): + insp = inspect(testing.db) + + insp.get_table_options(name) + + @quote_fixtures + def test_get_view_definition(self, name): + insp = inspect(testing.db) + assert insp.get_view_definition("view %s" % name) + + @quote_fixtures + def test_get_columns(self, name): + insp = inspect(testing.db) + assert insp.get_columns(name) + + @quote_fixtures + def test_get_pk_constraint(self, name): + insp = inspect(testing.db) + assert insp.get_pk_constraint(name) + + @quote_fixtures + def test_get_foreign_keys(self, name): + insp = inspect(testing.db) + assert insp.get_foreign_keys(name) + + @quote_fixtures + def test_get_indexes(self, name): + insp = inspect(testing.db) + assert insp.get_indexes(name) + + @quote_fixtures + @testing.requires.unique_constraint_reflection + def test_get_unique_constraints(self, name): + insp = inspect(testing.db) + assert insp.get_unique_constraints(name) + + @quote_fixtures + @testing.requires.comment_reflection + def test_get_table_comment(self, name): + insp = inspect(testing.db) + assert insp.get_table_comment(name) + + @quote_fixtures + @testing.requires.check_constraint_reflection + def test_get_check_constraints(self, name): + insp = inspect(testing.db) + assert insp.get_check_constraints(name) + + class ComponentReflectionTest(fixtures.TablesTest): run_inserts = run_deletes = None @@ -1193,6 +1331,7 @@ class ComputedReflectionTest(fixtures.ComputedReflectionFixtureTest): __all__ = ( "ComponentReflectionTest", + "QuotedNameArgumentTest", "HasTableTest", "NormalizedNameTest", "ComputedReflectionTest", diff --git a/test/requirements.py b/test/requirements.py index c739253f02..25b2e5880f 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -798,6 +798,16 @@ class DefaultRequirements(SuiteRequirements): ] ) + @property + def symbol_names_w_double_quote(self): + """Target driver can create tables with a name like 'some " table' + + """ + + return skip_if( + [no_support("oracle", "ORA-03001: unimplemented feature")] + ) + @property def emulated_lastrowid(self): """"target dialect retrieves cursor.lastrowid or an equivalent -- 2.39.5