]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Expand reg for schema translate map for most special characters
authorMike Bayer <mike_mp@zzzcomputing.com>
Wed, 7 Apr 2021 14:56:12 +0000 (10:56 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Wed, 7 Apr 2021 14:57:36 +0000 (10:57 -0400)
Fixed regression where usage of a token in the
:paramref:`_engine.Connection.execution_options.schema_translate_map`
dictionary which contained special characters such as braces would fail to
be substituted properly. Use of square bracket characters ``[]`` is now
explicitly disallowed as these are used as a delimiter character in the
current implementation.

Fixes: #6216
Change-Id: I7ccfc2292b17340054cedf485ed1adf3119b96c8

doc/build/changelog/unreleased_14/6216.rst [new file with mode: 0644]
lib/sqlalchemy/sql/compiler.py
test/sql/test_compiler.py

diff --git a/doc/build/changelog/unreleased_14/6216.rst b/doc/build/changelog/unreleased_14/6216.rst
new file mode 100644 (file)
index 0000000..b8779e9
--- /dev/null
@@ -0,0 +1,10 @@
+.. change::
+    :tags: bug, regression, schema
+    :tickets: 6216
+
+    Fixed regression where usage of a token in the
+    :paramref:`_engine.Connection.execution_options.schema_translate_map`
+    dictionary which contained special characters such as braces would fail to
+    be substituted properly. Use of square bracket characters ``[]`` is now
+    explicitly disallowed as these are used as a delimiter character in the
+    current implementation.
index 61dfbeb07dc9ed0fe83701eaf54c2d9c29e7bc4e..9aa747056cacb35f55b3c1dea7a94262b97e4825 100644 (file)
@@ -4661,6 +4661,11 @@ class IdentifierPreparer(object):
         def symbol_getter(obj):
             name = obj.schema
             if name in schema_translate_map and obj._use_schema_map:
+                if name is not None and ("[" in name or "]" in name):
+                    raise exc.CompileError(
+                        "Square bracket characters ([]) not supported "
+                        "in schema translate name '%s'" % name
+                    )
                 return quoted_name(
                     "[SCHEMA_%s]" % (name or "_none"), quote=False
                 )
@@ -4688,7 +4693,7 @@ class IdentifierPreparer(object):
                     )
             return self.quote(effective_schema)
 
-        return re.sub(r"(\[SCHEMA_([\w\d_]+)\])", replace, statement)
+        return re.sub(r"(\[SCHEMA_([^\]]+)\])", replace, statement)
 
     def _escape_identifier(self, value):
         """Escape an identifier.
index d7a74ee2eab4d1df6d04f3ddccbf343fc364daee..1ff5788b7f7fdf84c8f56b2d407fa77ebfcc70b1 100644 (file)
@@ -4854,6 +4854,76 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
             default_schema_name="main",
         )
 
+    def test_schema_translate_map_special_chars(self):
+        m = MetaData()
+        t1 = Table("t1", m, Column("q", Integer))
+        t2 = Table("t2", m, Column("q", Integer), schema="foo % ^ #")
+        t3 = Table("t3", m, Column("q", Integer), schema="bar {}")
+
+        schema_translate_map = {None: "z", "bar {}": None, "foo % ^ #": "bat"}
+
+        self.assert_compile(
+            schema.CreateTable(t1),
+            "CREATE TABLE [SCHEMA__none].t1 (q INTEGER)",
+            schema_translate_map=schema_translate_map,
+        )
+        self.assert_compile(
+            schema.CreateTable(t1),
+            "CREATE TABLE z.t1 (q INTEGER)",
+            schema_translate_map=schema_translate_map,
+            render_schema_translate=True,
+        )
+
+        self.assert_compile(
+            schema.CreateTable(t2),
+            "CREATE TABLE [SCHEMA_foo % ^ #].t2 (q INTEGER)",
+            schema_translate_map=schema_translate_map,
+        )
+        self.assert_compile(
+            schema.CreateTable(t2),
+            "CREATE TABLE bat.t2 (q INTEGER)",
+            schema_translate_map=schema_translate_map,
+            render_schema_translate=True,
+        )
+
+        self.assert_compile(
+            schema.CreateTable(t3),
+            "CREATE TABLE [SCHEMA_bar {}].t3 (q INTEGER)",
+            schema_translate_map=schema_translate_map,
+        )
+        self.assert_compile(
+            schema.CreateTable(t3),
+            "CREATE TABLE main.t3 (q INTEGER)",
+            schema_translate_map=schema_translate_map,
+            render_schema_translate=True,
+            default_schema_name="main",
+        )
+
+    def test_schema_translate_map_no_square_brackets(self):
+        m = MetaData()
+        t2 = Table("t2", m, Column("q", Integer), schema="bar [")
+        t3 = Table("t3", m, Column("q", Integer), schema="foo ]")
+
+        schema_translate_map = {None: "z", "bar [": None, "foo ]": "bat"}
+
+        assert_raises_message(
+            exc.CompileError,
+            r"Square bracket characters .* not supported "
+            r"in schema translate name 'bar \['",
+            schema.CreateTable(t2).compile,
+            schema_translate_map=schema_translate_map,
+            render_schema_translate=True,
+        )
+
+        assert_raises_message(
+            exc.CompileError,
+            r"Square bracket characters .* not supported "
+            r"in schema translate name 'foo \]'",
+            schema.CreateTable(t3).compile,
+            schema_translate_map=schema_translate_map,
+            render_schema_translate=True,
+        )
+
     def test_schema_translate_map_sequence(self):
         s1 = schema.Sequence("s1")
         s2 = schema.Sequence("s2", schema="foo")