]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
test single and double quote inspection scenarios
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 13 Jul 2020 14:49:57 +0000 (10:49 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 13 Jul 2020 15:36:40 +0000 (11:36 -0400)
Applied a sweep through all included dialects to ensure names that contain
single or double quotes are properly escaped when querying system tables,
for all :class:`.Inspector` methods that accept object names as an argument
(e.g. table names, view names, etc).   SQLite and MSSQL contained two
quoting issues that were repaired.

Fixes: #5456
Change-Id: I3bc98806f5166f3d82275650079ff561446f2aef
(cherry picked from commit 4351f7183c81bdb64a9a3ba47aa4b9891f42ffdb)

doc/build/changelog/unreleased_13/5456.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/mssql/base.py
lib/sqlalchemy/dialects/sqlite/base.py
lib/sqlalchemy/testing/requirements.py
lib/sqlalchemy/testing/suite/test_reflection.py
test/requirements.py

diff --git a/doc/build/changelog/unreleased_13/5456.rst b/doc/build/changelog/unreleased_13/5456.rst
new file mode 100644 (file)
index 0000000..823f373
--- /dev/null
@@ -0,0 +1,9 @@
+.. change::
+    :tags: bug, reflection, sqlite, mssql
+    :tickets: 5456
+
+    Applied a sweep through all included dialects to ensure names that contain
+    single or double quotes are properly escaped when querying system tables,
+    for all :class:`.Inspector` methods that accept object names as an argument
+    (e.g. table names, view names, etc).   SQLite and MSSQL contained two
+    quoting issues that were repaired.
index e66a8fd5d0f02ec36c3dfe3139e7210a68b58913..87959856ff95feff77049399c183e603636cfda8 100644 (file)
@@ -2738,8 +2738,11 @@ class MSDialect(default.DefaultDialect):
             colmap[col["name"]] = col
         # We also run an sp_columns to check for identity columns:
         cursor = connection.execute(
-            "sp_columns @table_name = '%s', "
-            "@table_owner = '%s'" % (tablename, owner)
+            sql.text(
+                "sp_columns @table_name = :table_name, "
+                "@table_owner = :table_owner",
+            ),
+            {"table_name": tablename, "table_owner": owner},
         )
         ic = None
         while True:
index 77afe5fb1b4d73989b9f43269cd58d630dd0ccb8..44eb2e1d5110a8f8f373ffa44e7e310855fb7dd5 100644 (file)
@@ -1660,27 +1660,26 @@ class SQLiteDialect(default.DefaultDialect):
         if schema is not None:
             qschema = self.identifier_preparer.quote_identifier(schema)
             master = "%s.sqlite_master" % qschema
-            s = ("SELECT sql FROM %s WHERE name = '%s'" "AND type='view'") % (
+            s = ("SELECT sql FROM %s WHERE name = AND type='view'") % (
                 master,
-                view_name,
             )
-            rs = connection.execute(s)
+            rs = connection.execute(s, (view_name,))
         else:
             try:
                 s = (
                     "SELECT sql FROM "
                     " (SELECT * FROM sqlite_master UNION ALL "
                     "  SELECT * FROM sqlite_temp_master) "
-                    "WHERE name = '%s' "
+                    "WHERE name = ? "
                     "AND type='view'"
-                ) % view_name
-                rs = connection.execute(s)
+                )
+                rs = connection.execute(s, (view_name,))
             except exc.DBAPIError:
                 s = (
-                    "SELECT sql FROM sqlite_master WHERE name = '%s' "
+                    "SELECT sql FROM sqlite_master WHERE name = ? "
                     "AND type='view'"
-                ) % view_name
-                rs = connection.execute(s)
+                )
+                rs = connection.execute(s, (view_name,))
 
         result = rs.fetchall()
         if result:
@@ -2124,19 +2123,17 @@ class SQLiteDialect(default.DefaultDialect):
                 "SELECT sql FROM "
                 " (SELECT * FROM %(schema)ssqlite_master UNION ALL "
                 "  SELECT * FROM %(schema)ssqlite_temp_master) "
-                "WHERE name = '%(table)s' "
-                "AND type = 'table'"
-                % {"schema": schema_expr, "table": table_name}
+                "WHERE name = ? "
+                "AND type = 'table'" % {"schema": schema_expr}
             )
-            rs = connection.execute(s)
+            rs = connection.execute(s, (table_name,))
         except exc.DBAPIError:
             s = (
                 "SELECT sql FROM %(schema)ssqlite_master "
-                "WHERE name = '%(table)s' "
-                "AND type = 'table'"
-                % {"schema": schema_expr, "table": table_name}
+                "WHERE name = ? "
+                "AND type = 'table'" % {"schema": schema_expr}
             )
-            rs = connection.execute(s)
+            rs = connection.execute(s, (table_name,))
         return rs.scalar()
 
     def _get_table_pragma(self, connection, pragma, table_name, schema=None):
index e026b1691880d8fcfdb76b7b132980fbc0097320..c49c7be52b3cf6908a4b42b0fa6e904d3098869e 100644 (file)
@@ -561,6 +561,13 @@ class SuiteRequirements(Requirements):
         """
         return exclusions.closed()
 
+    @property
+    def symbol_names_w_double_quote(self):
+        """Target driver can create tables with a name like 'some " table'
+
+        """
+        return exclusions.open()
+
     @property
     def datetime_literals(self):
         """target dialect supports rendering of a date, time, or datetime as a
index ce19c4aeddf780a4bb31dc82cf6fbfd6aa07954c..2142dd84ba61751b687be8a3b3fc8764db221200 100644 (file)
@@ -78,6 +78,144 @@ class HasTableTest(fixtures.TablesTest):
             )
 
 
+class QuotedNameArgumentTest(fixtures.TablesTest):
+    run_create_tables = "once"
+    __backend__ = True
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table(
+            "quote ' one",
+            metadata,
+            Column("id", Integer),
+            Column("name", String(50)),
+            Column("data", String(50)),
+            Column("related_id", Integer),
+            sa.PrimaryKeyConstraint("id", name="pk quote ' one"),
+            sa.Index("ix quote ' one", "name"),
+            sa.UniqueConstraint("data", name="uq quote' one",),
+            sa.ForeignKeyConstraint(
+                ["id"], ["related.id"], name="fk quote ' one"
+            ),
+            sa.CheckConstraint("name != 'foo'", name="ck quote ' one"),
+            comment=r"""quote ' one comment""",
+            test_needs_fk=True,
+        )
+
+        if testing.requires.symbol_names_w_double_quote.enabled:
+            Table(
+                'quote " two',
+                metadata,
+                Column("id", Integer),
+                Column("name", String(50)),
+                Column("data", String(50)),
+                Column("related_id", Integer),
+                sa.PrimaryKeyConstraint("id", name='pk quote " two'),
+                sa.Index('ix quote " two', "name"),
+                sa.UniqueConstraint("data", name='uq quote" two',),
+                sa.ForeignKeyConstraint(
+                    ["id"], ["related.id"], name='fk quote " two'
+                ),
+                sa.CheckConstraint("name != 'foo'", name='ck quote " two '),
+                comment=r"""quote " two comment""",
+                test_needs_fk=True,
+            )
+
+        Table(
+            "related",
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("related", Integer),
+            test_needs_fk=True,
+        )
+
+        if testing.requires.view_column_reflection.enabled:
+
+            if testing.requires.symbol_names_w_double_quote.enabled:
+                names = [
+                    "quote ' one",
+                    'quote " two',
+                ]
+            else:
+                names = [
+                    "quote ' one",
+                ]
+            for name in names:
+                query = "CREATE VIEW %s AS SELECT * FROM %s" % (
+                    testing.db.dialect.identifier_preparer.quote(
+                        "view %s" % name
+                    ),
+                    testing.db.dialect.identifier_preparer.quote(name),
+                )
+
+                event.listen(metadata, "after_create", DDL(query))
+                event.listen(
+                    metadata,
+                    "before_drop",
+                    DDL(
+                        "DROP VIEW %s"
+                        % testing.db.dialect.identifier_preparer.quote(
+                            "view %s" % name
+                        )
+                    ),
+                )
+
+    def quote_fixtures(fn):
+        return testing.combinations(
+            ("quote ' one",),
+            ('quote " two', testing.requires.symbol_names_w_double_quote),
+        )(fn)
+
+    @quote_fixtures
+    def test_get_table_options(self, name):
+        insp = inspect(testing.db)
+
+        insp.get_table_options(name)
+
+    @quote_fixtures
+    def test_get_view_definition(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_view_definition("view %s" % name)
+
+    @quote_fixtures
+    def test_get_columns(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_columns(name)
+
+    @quote_fixtures
+    def test_get_pk_constraint(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_pk_constraint(name)
+
+    @quote_fixtures
+    def test_get_foreign_keys(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_foreign_keys(name)
+
+    @quote_fixtures
+    def test_get_indexes(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_indexes(name)
+
+    @quote_fixtures
+    @testing.requires.unique_constraint_reflection
+    def test_get_unique_constraints(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_unique_constraints(name)
+
+    @quote_fixtures
+    @testing.requires.comment_reflection
+    def test_get_table_comment(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_table_comment(name)
+
+    @quote_fixtures
+    @testing.requires.check_constraint_reflection
+    def test_get_check_constraints(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_check_constraints(name)
+
+
 class ComponentReflectionTest(fixtures.TablesTest):
     run_inserts = run_deletes = None
 
@@ -1193,6 +1331,7 @@ class ComputedReflectionTest(fixtures.ComputedReflectionFixtureTest):
 
 __all__ = (
     "ComponentReflectionTest",
+    "QuotedNameArgumentTest",
     "HasTableTest",
     "NormalizedNameTest",
     "ComputedReflectionTest",
index c739253f0215b616b9038a2ef4ee4533d028e39e..25b2e5880ff6b2e1d66855c77572d81ff5a79906 100644 (file)
@@ -798,6 +798,16 @@ class DefaultRequirements(SuiteRequirements):
             ]
         )
 
+    @property
+    def symbol_names_w_double_quote(self):
+        """Target driver can create tables with a name like 'some " table'
+
+        """
+
+        return skip_if(
+            [no_support("oracle", "ORA-03001: unimplemented feature")]
+        )
+
     @property
     def emulated_lastrowid(self):
         """"target dialect retrieves cursor.lastrowid or an equivalent