]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
test single and double quote inspection scenarios
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 13 Jul 2020 14:49:57 +0000 (10:49 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 13 Jul 2020 15:35:05 +0000 (11:35 -0400)
Applied a sweep through all included dialects to ensure names that contain
single or double quotes are properly escaped when querying system tables,
for all :class:`.Inspector` methods that accept object names as an argument
(e.g. table names, view names, etc).   SQLite and MSSQL contained two
quoting issues that were repaired.

Fixes: #5456
Change-Id: I3bc98806f5166f3d82275650079ff561446f2aef

doc/build/changelog/unreleased_13/5456.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/mssql/base.py
lib/sqlalchemy/dialects/sqlite/base.py
lib/sqlalchemy/testing/requirements.py
lib/sqlalchemy/testing/suite/test_reflection.py
test/requirements.py

diff --git a/doc/build/changelog/unreleased_13/5456.rst b/doc/build/changelog/unreleased_13/5456.rst
new file mode 100644 (file)
index 0000000..823f373
--- /dev/null
@@ -0,0 +1,9 @@
+.. change::
+    :tags: bug, reflection, sqlite, mssql
+    :tickets: 5456
+
+    Applied a sweep through all included dialects to ensure names that contain
+    single or double quotes are properly escaped when querying system tables,
+    for all :class:`.Inspector` methods that accept object names as an argument
+    (e.g. table names, view names, etc).   SQLite and MSSQL contained two
+    quoting issues that were repaired.
index 06ea80b9e3fd2572b5f3953c9f9f3bf88dc9bf78..35e6799c5984464c45036c4479c97b633994a7eb 100644 (file)
@@ -2887,9 +2887,12 @@ class MSDialect(default.DefaultDialect):
         for col in cols:
             colmap[col["name"]] = col
         # We also run an sp_columns to check for identity columns:
-        cursor = connection.exec_driver_sql(
-            "sp_columns @table_name = '%s', "
-            "@table_owner = '%s'" % (tablename, owner)
+        cursor = connection.execute(
+            sql.text(
+                "sp_columns @table_name = :table_name, "
+                "@table_owner = :table_owner",
+            ),
+            {"table_name": tablename, "table_owner": owner},
         )
         ic = None
         while True:
index a203e786e202d724349ace54cb3c5f1c0605a99b..2868eabbac031ed7b31eba2477aa1b7bb38af22e 100644 (file)
@@ -1664,27 +1664,26 @@ class SQLiteDialect(default.DefaultDialect):
         if schema is not None:
             qschema = self.identifier_preparer.quote_identifier(schema)
             master = "%s.sqlite_master" % qschema
-            s = ("SELECT sql FROM %s WHERE name = '%s'" "AND type='view'") % (
+            s = ("SELECT sql FROM %s WHERE name = AND type='view'") % (
                 master,
-                view_name,
             )
-            rs = connection.exec_driver_sql(s)
+            rs = connection.exec_driver_sql(s, (view_name,))
         else:
             try:
                 s = (
                     "SELECT sql FROM "
                     " (SELECT * FROM sqlite_master UNION ALL "
                     "  SELECT * FROM sqlite_temp_master) "
-                    "WHERE name = '%s' "
+                    "WHERE name = ? "
                     "AND type='view'"
-                ) % view_name
-                rs = connection.exec_driver_sql(s)
+                )
+                rs = connection.exec_driver_sql(s, (view_name,))
             except exc.DBAPIError:
                 s = (
-                    "SELECT sql FROM sqlite_master WHERE name = '%s' "
+                    "SELECT sql FROM sqlite_master WHERE name = ? "
                     "AND type='view'"
-                ) % view_name
-                rs = connection.exec_driver_sql(s)
+                )
+                rs = connection.exec_driver_sql(s, (view_name,))
 
         result = rs.fetchall()
         if result:
@@ -2132,19 +2131,17 @@ class SQLiteDialect(default.DefaultDialect):
                 "SELECT sql FROM "
                 " (SELECT * FROM %(schema)ssqlite_master UNION ALL "
                 "  SELECT * FROM %(schema)ssqlite_temp_master) "
-                "WHERE name = '%(table)s' "
-                "AND type = 'table'"
-                % {"schema": schema_expr, "table": table_name}
+                "WHERE name = ? "
+                "AND type = 'table'" % {"schema": schema_expr}
             )
-            rs = connection.exec_driver_sql(s)
+            rs = connection.exec_driver_sql(s, (table_name,))
         except exc.DBAPIError:
             s = (
                 "SELECT sql FROM %(schema)ssqlite_master "
-                "WHERE name = '%(table)s' "
-                "AND type = 'table'"
-                % {"schema": schema_expr, "table": table_name}
+                "WHERE name = ? "
+                "AND type = 'table'" % {"schema": schema_expr}
             )
-            rs = connection.exec_driver_sql(s)
+            rs = connection.exec_driver_sql(s, (table_name,))
         return rs.scalar()
 
     def _get_table_pragma(self, connection, pragma, table_name, schema=None):
index 1c350da3b1338f5b591f36ebc8e2ac4b5169a098..72f2612aac042ed973a6610c2bd5bf5c4128b3a6 100644 (file)
@@ -634,6 +634,13 @@ class SuiteRequirements(Requirements):
         """
         return exclusions.closed()
 
+    @property
+    def symbol_names_w_double_quote(self):
+        """Target driver can create tables with a name like 'some " table'
+
+        """
+        return exclusions.open()
+
     @property
     def datetime_literals(self):
         """target dialect supports rendering of a date, time, or datetime as a
index 84b3aba5be4d1f4b5164c043ffae058e4edbd116..776d559d40ee3847be45b18a54bb2a8564e3eb33 100644 (file)
@@ -134,6 +134,144 @@ class HasIndexTest(fixtures.TablesTest):
             )
 
 
+class QuotedNameArgumentTest(fixtures.TablesTest):
+    run_create_tables = "once"
+    __backend__ = True
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table(
+            "quote ' one",
+            metadata,
+            Column("id", Integer),
+            Column("name", String(50)),
+            Column("data", String(50)),
+            Column("related_id", Integer),
+            sa.PrimaryKeyConstraint("id", name="pk quote ' one"),
+            sa.Index("ix quote ' one", "name"),
+            sa.UniqueConstraint("data", name="uq quote' one",),
+            sa.ForeignKeyConstraint(
+                ["id"], ["related.id"], name="fk quote ' one"
+            ),
+            sa.CheckConstraint("name != 'foo'", name="ck quote ' one"),
+            comment=r"""quote ' one comment""",
+            test_needs_fk=True,
+        )
+
+        if testing.requires.symbol_names_w_double_quote.enabled:
+            Table(
+                'quote " two',
+                metadata,
+                Column("id", Integer),
+                Column("name", String(50)),
+                Column("data", String(50)),
+                Column("related_id", Integer),
+                sa.PrimaryKeyConstraint("id", name='pk quote " two'),
+                sa.Index('ix quote " two', "name"),
+                sa.UniqueConstraint("data", name='uq quote" two',),
+                sa.ForeignKeyConstraint(
+                    ["id"], ["related.id"], name='fk quote " two'
+                ),
+                sa.CheckConstraint("name != 'foo'", name='ck quote " two '),
+                comment=r"""quote " two comment""",
+                test_needs_fk=True,
+            )
+
+        Table(
+            "related",
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("related", Integer),
+            test_needs_fk=True,
+        )
+
+        if testing.requires.view_column_reflection.enabled:
+
+            if testing.requires.symbol_names_w_double_quote.enabled:
+                names = [
+                    "quote ' one",
+                    'quote " two',
+                ]
+            else:
+                names = [
+                    "quote ' one",
+                ]
+            for name in names:
+                query = "CREATE VIEW %s AS SELECT * FROM %s" % (
+                    testing.db.dialect.identifier_preparer.quote(
+                        "view %s" % name
+                    ),
+                    testing.db.dialect.identifier_preparer.quote(name),
+                )
+
+                event.listen(metadata, "after_create", DDL(query))
+                event.listen(
+                    metadata,
+                    "before_drop",
+                    DDL(
+                        "DROP VIEW %s"
+                        % testing.db.dialect.identifier_preparer.quote(
+                            "view %s" % name
+                        )
+                    ),
+                )
+
+    def quote_fixtures(fn):
+        return testing.combinations(
+            ("quote ' one",),
+            ('quote " two', testing.requires.symbol_names_w_double_quote),
+        )(fn)
+
+    @quote_fixtures
+    def test_get_table_options(self, name):
+        insp = inspect(testing.db)
+
+        insp.get_table_options(name)
+
+    @quote_fixtures
+    def test_get_view_definition(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_view_definition("view %s" % name)
+
+    @quote_fixtures
+    def test_get_columns(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_columns(name)
+
+    @quote_fixtures
+    def test_get_pk_constraint(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_pk_constraint(name)
+
+    @quote_fixtures
+    def test_get_foreign_keys(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_foreign_keys(name)
+
+    @quote_fixtures
+    def test_get_indexes(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_indexes(name)
+
+    @quote_fixtures
+    @testing.requires.unique_constraint_reflection
+    def test_get_unique_constraints(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_unique_constraints(name)
+
+    @quote_fixtures
+    @testing.requires.comment_reflection
+    def test_get_table_comment(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_table_comment(name)
+
+    @quote_fixtures
+    @testing.requires.check_constraint_reflection
+    def test_get_check_constraints(self, name):
+        insp = inspect(testing.db)
+        assert insp.get_check_constraints(name)
+
+
 class ComponentReflectionTest(fixtures.TablesTest):
     run_inserts = run_deletes = None
 
@@ -1254,6 +1392,7 @@ class ComputedReflectionTest(fixtures.ComputedReflectionFixtureTest):
 
 __all__ = (
     "ComponentReflectionTest",
+    "QuotedNameArgumentTest",
     "HasTableTest",
     "HasIndexTest",
     "NormalizedNameTest",
index 1ab0993c69fd756562879083e501e7519fd8c506..28f955fa5fd6dd20a1cc1fc6de43aba42c014124 100644 (file)
@@ -845,6 +845,16 @@ class DefaultRequirements(SuiteRequirements):
             ]
         )
 
+    @property
+    def symbol_names_w_double_quote(self):
+        """Target driver can create tables with a name like 'some " table'
+
+        """
+
+        return skip_if(
+            [no_support("oracle", "ORA-03001: unimplemented feature")]
+        )
+
     @property
     def emulated_lastrowid(self):
         """"target dialect retrieves cursor.lastrowid or an equivalent