]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Add deprecation warning for mssql legacy_schema_aliasing
authorGord Thompson <gord@gordthompson.com>
Thu, 20 Aug 2020 22:26:13 +0000 (16:26 -0600)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 24 Aug 2020 15:23:33 +0000 (11:23 -0400)
Fixes: #4809
Change-Id: I9ce2a5dfb79d86624c187ee28b5911fd14328ce2

doc/build/changelog/unreleased_14/4809.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/mssql/base.py
lib/sqlalchemy/dialects/mssql/provision.py
test/dialect/mssql/test_deprecations.py [new file with mode: 0644]
test/dialect/mssql/test_engine.py
test/dialect/mssql/test_query.py

diff --git a/doc/build/changelog/unreleased_14/4809.rst b/doc/build/changelog/unreleased_14/4809.rst
new file mode 100644 (file)
index 0000000..bb7bc66
--- /dev/null
@@ -0,0 +1,7 @@
+.. change::
+    :tags: mssql, engine
+    :tickets: 4809
+
+    Deprecated the ``legacy_schema_aliasing`` parameter to
+    :meth:`_sa.create_engine`.   This is a long-outdated parameter that has
+    defaulted to False since version 1.1.
index d18fb72990ac2d1ccf68671626ad07d4330985bd..f38c537fdcf167fc89db62953fdc00ae8e5f2fa8 100644 (file)
@@ -537,6 +537,10 @@ it is available using the ``legacy_schema_aliasing`` argument to
    in version 1.0.5 to allow disabling of legacy mode for schemas now
    defaults to False.
 
+.. deprecated:: 1.4
+
+   The ``legacy_schema_aliasing`` flag is now
+   deprecated and will be removed in a future release.
 
 .. _mssql_indexes:
 
@@ -2549,6 +2553,8 @@ class MSDialect(default.DefaultDialect):
     _supports_offset_fetch = False
     _supports_nvarchar_max = False
 
+    legacy_schema_aliasing = False
+
     server_version_info = ()
 
     statement_compiler = MSSQLCompiler
@@ -2573,9 +2579,9 @@ class MSDialect(default.DefaultDialect):
         schema_name="dbo",
         isolation_level=None,
         deprecate_large_types=None,
-        legacy_schema_aliasing=False,
         json_serializer=None,
         json_deserializer=None,
+        legacy_schema_aliasing=None,
         **opts
     ):
         self.query_timeout = int(query_timeout or 0)
@@ -2583,7 +2589,14 @@ class MSDialect(default.DefaultDialect):
 
         self.use_scope_identity = use_scope_identity
         self.deprecate_large_types = deprecate_large_types
-        self.legacy_schema_aliasing = legacy_schema_aliasing
+
+        if legacy_schema_aliasing is not None:
+            util.warn_deprecated(
+                "The legacy_schema_aliasing parameter is "
+                "deprecated and will be removed in a future release.",
+                "1.4",
+            )
+            self.legacy_schema_aliasing = legacy_schema_aliasing
 
         super(MSDialect, self).__init__(**opts)
 
index 23c841da632b7e390ca7f48b14f9d98d197345bc..a5131eae6afc9e2fbde28277b4879542e3313c8b 100644 (file)
@@ -4,12 +4,6 @@ from ...testing.provision import create_db
 from ...testing.provision import drop_db
 from ...testing.provision import log
 from ...testing.provision import run_reap_dbs
-from ...testing.provision import update_db_opts
-
-
-@update_db_opts.for_db("mssql")
-def _mssql_update_db_opts(db_url, db_opts):
-    db_opts["legacy_schema_aliasing"] = False
 
 
 @create_db.for_db("mssql")
diff --git a/test/dialect/mssql/test_deprecations.py b/test/dialect/mssql/test_deprecations.py
new file mode 100644 (file)
index 0000000..c0df266
--- /dev/null
@@ -0,0 +1,244 @@
+# -*- encoding: utf-8
+from sqlalchemy import Column
+from sqlalchemy import engine_from_config
+from sqlalchemy import Integer
+from sqlalchemy import MetaData
+from sqlalchemy import select
+from sqlalchemy import String
+from sqlalchemy import Table
+from sqlalchemy import testing
+from sqlalchemy.dialects.mssql import base as mssql
+from sqlalchemy.sql import column
+from sqlalchemy.sql import table
+from sqlalchemy.testing import assertions
+from sqlalchemy.testing import AssertsCompiledSQL
+from sqlalchemy.testing import engines
+from sqlalchemy.testing import eq_
+from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import is_
+from sqlalchemy.testing.mock import Mock
+
+
+def _legacy_schema_aliasing_warning():
+    return assertions.expect_deprecated("The legacy_schema_aliasing parameter")
+
+
+class LegacySchemaAliasingTest(fixtures.TestBase, AssertsCompiledSQL):
+    """Legacy behavior tried to prevent schema-qualified tables
+    from being rendered as dotted names, and were instead aliased.
+
+    This behavior no longer seems to be required.
+
+    """
+
+    def setup(self):
+        metadata = MetaData()
+        self.t1 = table(
+            "t1",
+            column("a", Integer),
+            column("b", String),
+            column("c", String),
+        )
+        self.t2 = Table(
+            "t2",
+            metadata,
+            Column("a", Integer),
+            Column("b", Integer),
+            Column("c", Integer),
+            schema="schema",
+        )
+
+    def _assert_sql(self, element, legacy_sql, modern_sql=None):
+        dialect = self._legacy_dialect()
+
+        self.assert_compile(element, legacy_sql, dialect=dialect)
+
+        dialect = mssql.dialect()
+        self.assert_compile(element, modern_sql or "foob", dialect=dialect)
+
+    def _legacy_dialect(self):
+        with _legacy_schema_aliasing_warning():
+            return mssql.dialect(legacy_schema_aliasing=True)
+
+    @testing.combinations(
+        (
+            {
+                "sqlalchemy.url": "mssql://foodsn",
+                "sqlalchemy.legacy_schema_aliasing": "true",
+            },
+            True,
+        ),
+        (
+            {
+                "sqlalchemy.url": "mssql://foodsn",
+                "sqlalchemy.legacy_schema_aliasing": "false",
+            },
+            False,
+        ),
+    )
+    def test_legacy_schema_flag(self, cfg, expected):
+        with testing.expect_deprecated("The legacy_schema_aliasing parameter"):
+            e = engine_from_config(
+                cfg, module=Mock(version="MS SQL Server 11.0.92")
+            )
+            is_(e.dialect.legacy_schema_aliasing, expected)
+
+    def test_result_map(self):
+        s = self.t2.select()
+        c = s.compile(dialect=self._legacy_dialect())
+        assert self.t2.c.a in set(c._create_result_map()["a"][1])
+
+    def test_result_map_use_labels(self):
+        s = self.t2.select(use_labels=True)
+        c = s.compile(dialect=self._legacy_dialect())
+        assert self.t2.c.a in set(c._create_result_map()["schema_t2_a"][1])
+
+    def test_straight_select(self):
+        self._assert_sql(
+            self.t2.select(),
+            "SELECT t2_1.a, t2_1.b, t2_1.c FROM [schema].t2 AS t2_1",
+            "SELECT [schema].t2.a, [schema].t2.b, "
+            "[schema].t2.c FROM [schema].t2",
+        )
+
+    def test_straight_select_use_labels(self):
+        self._assert_sql(
+            self.t2.select(use_labels=True),
+            "SELECT t2_1.a AS schema_t2_a, t2_1.b AS schema_t2_b, "
+            "t2_1.c AS schema_t2_c FROM [schema].t2 AS t2_1",
+            "SELECT [schema].t2.a AS schema_t2_a, "
+            "[schema].t2.b AS schema_t2_b, "
+            "[schema].t2.c AS schema_t2_c FROM [schema].t2",
+        )
+
+    def test_join_to_schema(self):
+        t1, t2 = self.t1, self.t2
+        self._assert_sql(
+            t1.join(t2, t1.c.a == t2.c.a).select(),
+            "SELECT t1.a, t1.b, t1.c, t2_1.a, t2_1.b, t2_1.c FROM t1 "
+            "JOIN [schema].t2 AS t2_1 ON t2_1.a = t1.a",
+            "SELECT t1.a, t1.b, t1.c, [schema].t2.a, [schema].t2.b, "
+            "[schema].t2.c FROM t1 JOIN [schema].t2 ON [schema].t2.a = t1.a",
+        )
+
+    def test_union_schema_to_non(self):
+        t1, t2 = self.t1, self.t2
+        s = (
+            select(t2.c.a, t2.c.b)
+            .apply_labels()
+            .union(select(t1.c.a, t1.c.b).apply_labels())
+            .alias()
+            .select()
+        )
+        self._assert_sql(
+            s,
+            "SELECT anon_1.schema_t2_a, anon_1.schema_t2_b FROM "
+            "(SELECT t2_1.a AS schema_t2_a, t2_1.b AS schema_t2_b "
+            "FROM [schema].t2 AS t2_1 UNION SELECT t1.a AS t1_a, "
+            "t1.b AS t1_b FROM t1) AS anon_1",
+            "SELECT anon_1.schema_t2_a, anon_1.schema_t2_b FROM "
+            "(SELECT [schema].t2.a AS schema_t2_a, [schema].t2.b AS "
+            "schema_t2_b FROM [schema].t2 UNION SELECT t1.a AS t1_a, "
+            "t1.b AS t1_b FROM t1) AS anon_1",
+        )
+
+    def test_column_subquery_to_alias(self):
+        a1 = self.t2.alias("a1")
+        s = select([self.t2, select(a1.c.a).scalar_subquery()])
+        self._assert_sql(
+            s,
+            "SELECT t2_1.a, t2_1.b, t2_1.c, "
+            "(SELECT a1.a FROM [schema].t2 AS a1) "
+            "AS anon_1 FROM [schema].t2 AS t2_1",
+            "SELECT [schema].t2.a, [schema].t2.b, [schema].t2.c, "
+            "(SELECT a1.a FROM [schema].t2 AS a1) AS anon_1 FROM [schema].t2",
+        )
+
+
+class LegacySchemaAliasingBackendTest(
+    testing.AssertsExecutionResults, fixtures.TestBase
+):
+    __only_on__ = "mssql"
+
+    @testing.provide_metadata
+    def test_insertid_schema(self):
+        meta = self.metadata
+
+        with _legacy_schema_aliasing_warning():
+            eng = engines.testing_engine(
+                options=dict(legacy_schema_aliasing=False)
+            )
+
+        tbl = Table(
+            "test",
+            meta,
+            Column("id", Integer, primary_key=True),
+            schema=testing.config.test_schema,
+        )
+
+        with eng.connect() as conn:
+            tbl.create(conn)
+            conn.execute(tbl.insert(), {"id": 1})
+            eq_(conn.scalar(tbl.select()), 1)
+
+    @testing.provide_metadata
+    def test_insertid_schema_legacy(self):
+        meta = self.metadata
+
+        tbl = Table(
+            "test",
+            meta,
+            Column("id", Integer, primary_key=True),
+            schema=testing.config.test_schema,
+        )
+
+        with _legacy_schema_aliasing_warning():
+            eng = engines.testing_engine(
+                options=dict(legacy_schema_aliasing=True)
+            )
+
+        with eng.connect() as conn:
+
+            tbl.create(conn)
+            conn.execute(tbl.insert(), {"id": 1})
+            eq_(conn.scalar(tbl.select()), 1)
+
+    @testing.provide_metadata
+    def test_delete_schema(self, connection):
+        meta = self.metadata
+
+        is_(connection.dialect.legacy_schema_aliasing, False)
+
+        tbl = Table(
+            "test",
+            meta,
+            Column("id", Integer, primary_key=True),
+            schema=testing.config.test_schema,
+        )
+        tbl.create(connection)
+        connection.execute(tbl.insert(), {"id": 1})
+        eq_(connection.scalar(tbl.select()), 1)
+        connection.execute(tbl.delete(tbl.c.id == 1))
+        eq_(connection.scalar(tbl.select()), None)
+
+    @testing.provide_metadata
+    def test_delete_schema_legacy(self):
+        meta = self.metadata
+        with _legacy_schema_aliasing_warning():
+            eng = engines.testing_engine(
+                options=dict(legacy_schema_aliasing=True)
+            )
+
+        tbl = Table(
+            "test",
+            meta,
+            Column("id", Integer, primary_key=True),
+            schema=testing.config.test_schema,
+        )
+
+        with eng.connect() as conn:
+            tbl.create(conn)
+            conn.execute(tbl.insert(), {"id": 1})
+            eq_(conn.scalar(tbl.select()), 1)
+            conn.execute(tbl.delete(tbl.c.id == 1))
+            eq_(conn.scalar(tbl.select()), None)
index 2209414890eb103e32d87fda3ea5580d731b68f9..27e92d5b5c1f3f8adbb07ef831d2863f40fd934e 100644 (file)
@@ -1,7 +1,6 @@
 # -*- encoding: utf-8
 
 from sqlalchemy import Column
-from sqlalchemy import engine_from_config
 from sqlalchemy import event
 from sqlalchemy import exc
 from sqlalchemy import Integer
@@ -339,18 +338,6 @@ class ParseConnectTest(fixtures.TestBase):
         )
 
 
-class EngineFromConfigTest(fixtures.TestBase):
-    def test_legacy_schema_flag(self):
-        cfg = {
-            "sqlalchemy.url": "mssql://foodsn",
-            "sqlalchemy.legacy_schema_aliasing": "false",
-        }
-        e = engine_from_config(
-            cfg, module=Mock(version="MS SQL Server 11.0.92")
-        )
-        eq_(e.dialect.legacy_schema_aliasing, False)
-
-
 class FastExecutemanyTest(fixtures.TestBase):
     __only_on__ = "mssql"
     __backend__ = True
index d8f2a4a0e62a09843b1ed27c872e46543b9a0124..e37b388e878fda918dae4cc93f43aab064bb1636 100644 (file)
@@ -18,8 +18,6 @@ from sqlalchemy import Table
 from sqlalchemy import testing
 from sqlalchemy import util
 from sqlalchemy.dialects.mssql import base as mssql
-from sqlalchemy.sql import column
-from sqlalchemy.sql import table
 from sqlalchemy.testing import AssertsCompiledSQL
 from sqlalchemy.testing import engines
 from sqlalchemy.testing import eq_
@@ -28,120 +26,11 @@ from sqlalchemy.testing.assertsql import CursorSQL
 from sqlalchemy.testing.assertsql import DialectSQL
 from sqlalchemy.util import ue
 
-
 metadata = None
 cattable = None
 matchtable = None
 
 
-class LegacySchemaAliasingTest(fixtures.TestBase, AssertsCompiledSQL):
-    """Legacy behavior tried to prevent schema-qualified tables
-    from being rendered as dotted names, and were instead aliased.
-
-    This behavior no longer seems to be required.
-
-    """
-
-    def setup(self):
-        metadata = MetaData()
-        self.t1 = table(
-            "t1",
-            column("a", Integer),
-            column("b", String),
-            column("c", String),
-        )
-        self.t2 = Table(
-            "t2",
-            metadata,
-            Column("a", Integer),
-            Column("b", Integer),
-            Column("c", Integer),
-            schema="schema",
-        )
-
-    def _assert_sql(self, element, legacy_sql, modern_sql=None):
-        dialect = mssql.dialect(legacy_schema_aliasing=True)
-
-        self.assert_compile(element, legacy_sql, dialect=dialect)
-
-        dialect = mssql.dialect()
-        self.assert_compile(element, modern_sql or "foob", dialect=dialect)
-
-    def _legacy_dialect(self):
-        return mssql.dialect(legacy_schema_aliasing=True)
-
-    def test_result_map(self):
-        s = self.t2.select()
-        c = s.compile(dialect=self._legacy_dialect())
-        assert self.t2.c.a in set(c._create_result_map()["a"][1])
-
-    def test_result_map_use_labels(self):
-        s = self.t2.select(use_labels=True)
-        c = s.compile(dialect=self._legacy_dialect())
-        assert self.t2.c.a in set(c._create_result_map()["schema_t2_a"][1])
-
-    def test_straight_select(self):
-        self._assert_sql(
-            self.t2.select(),
-            "SELECT t2_1.a, t2_1.b, t2_1.c FROM [schema].t2 AS t2_1",
-            "SELECT [schema].t2.a, [schema].t2.b, "
-            "[schema].t2.c FROM [schema].t2",
-        )
-
-    def test_straight_select_use_labels(self):
-        self._assert_sql(
-            self.t2.select(use_labels=True),
-            "SELECT t2_1.a AS schema_t2_a, t2_1.b AS schema_t2_b, "
-            "t2_1.c AS schema_t2_c FROM [schema].t2 AS t2_1",
-            "SELECT [schema].t2.a AS schema_t2_a, "
-            "[schema].t2.b AS schema_t2_b, "
-            "[schema].t2.c AS schema_t2_c FROM [schema].t2",
-        )
-
-    def test_join_to_schema(self):
-        t1, t2 = self.t1, self.t2
-        self._assert_sql(
-            t1.join(t2, t1.c.a == t2.c.a).select(),
-            "SELECT t1.a, t1.b, t1.c, t2_1.a, t2_1.b, t2_1.c FROM t1 "
-            "JOIN [schema].t2 AS t2_1 ON t2_1.a = t1.a",
-            "SELECT t1.a, t1.b, t1.c, [schema].t2.a, [schema].t2.b, "
-            "[schema].t2.c FROM t1 JOIN [schema].t2 ON [schema].t2.a = t1.a",
-        )
-
-    def test_union_schema_to_non(self):
-        t1, t2 = self.t1, self.t2
-        s = (
-            select(t2.c.a, t2.c.b)
-            .apply_labels()
-            .union(select(t1.c.a, t1.c.b).apply_labels())
-            .alias()
-            .select()
-        )
-        self._assert_sql(
-            s,
-            "SELECT anon_1.schema_t2_a, anon_1.schema_t2_b FROM "
-            "(SELECT t2_1.a AS schema_t2_a, t2_1.b AS schema_t2_b "
-            "FROM [schema].t2 AS t2_1 UNION SELECT t1.a AS t1_a, "
-            "t1.b AS t1_b FROM t1) AS anon_1",
-            "SELECT anon_1.schema_t2_a, anon_1.schema_t2_b FROM "
-            "(SELECT [schema].t2.a AS schema_t2_a, [schema].t2.b AS "
-            "schema_t2_b FROM [schema].t2 UNION SELECT t1.a AS t1_a, "
-            "t1.b AS t1_b FROM t1) AS anon_1",
-        )
-
-    def test_column_subquery_to_alias(self):
-        a1 = self.t2.alias("a1")
-        s = select([self.t2, select(a1.c.a).scalar_subquery()])
-        self._assert_sql(
-            s,
-            "SELECT t2_1.a, t2_1.b, t2_1.c, "
-            "(SELECT a1.a FROM [schema].t2 AS a1) "
-            "AS anon_1 FROM [schema].t2 AS t2_1",
-            "SELECT [schema].t2.a, [schema].t2.b, [schema].t2.c, "
-            "(SELECT a1.a FROM [schema].t2 AS a1) AS anon_1 FROM [schema].t2",
-        )
-
-
 class IdentityInsertTest(fixtures.TestBase, AssertsCompiledSQL):
     __only_on__ = "mssql"
     __dialect__ = mssql.MSDialect()
@@ -433,44 +322,18 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
         )
 
     @testing.provide_metadata
-    def test_insertid_schema(self):
-        meta = self.metadata
-        eng = engines.testing_engine(
-            options=dict(legacy_schema_aliasing=False)
-        )
-        meta.bind = eng
-        conn = eng.connect()
-        conn.exec_driver_sql("create schema paj")
-
-        @event.listens_for(meta, "after_drop")
-        def cleanup(target, connection, **kw):
-            connection.exec_driver_sql("drop schema paj")
-
-        tbl = Table(
-            "test", meta, Column("id", Integer, primary_key=True), schema="paj"
-        )
-        tbl.create(conn)
-        conn.execute(tbl.insert(), {"id": 1})
-        eq_(conn.scalar(tbl.select()), 1)
-
-    @testing.provide_metadata
-    def test_insertid_schema_legacy(self):
+    def test_insertid_schema(self, connection):
         meta = self.metadata
-        eng = engines.testing_engine(options=dict(legacy_schema_aliasing=True))
-        meta.bind = eng
-        conn = eng.connect()
-        conn.exec_driver_sql("create schema paj")
-
-        @event.listens_for(meta, "after_drop")
-        def cleanup(target, connection, **kw):
-            connection.exec_driver_sql("drop schema paj")
 
         tbl = Table(
-            "test", meta, Column("id", Integer, primary_key=True), schema="paj"
+            "test",
+            meta,
+            Column("id", Integer, primary_key=True),
+            schema=testing.config.test_schema,
         )
-        tbl.create()
-        tbl.insert().execute({"id": 1})
-        eq_(tbl.select().scalar(), 1)
+        tbl.create(connection)
+        connection.execute(tbl.insert(), {"id": 1})
+        eq_(connection.scalar(tbl.select()), 1)
 
     @testing.provide_metadata
     def test_returning_no_autoinc(self, connection):
@@ -490,48 +353,20 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
         eq_(result.fetchall(), [(1, "somestring")])
 
     @testing.provide_metadata
-    def test_delete_schema(self):
+    def test_delete_schema(self, connection):
         meta = self.metadata
-        eng = engines.testing_engine(
-            options=dict(legacy_schema_aliasing=False)
-        )
-        meta.bind = eng
-        conn = eng.connect()
-        conn.exec_driver_sql("create schema paj")
-
-        @event.listens_for(meta, "after_drop")
-        def cleanup(target, connection, **kw):
-            connection.exec_driver_sql("drop schema paj")
 
         tbl = Table(
-            "test", meta, Column("id", Integer, primary_key=True), schema="paj"
-        )
-        tbl.create(conn)
-        conn.execute(tbl.insert(), {"id": 1})
-        eq_(conn.scalar(tbl.select()), 1)
-        conn.execute(tbl.delete(tbl.c.id == 1))
-        eq_(conn.scalar(tbl.select()), None)
-
-    @testing.provide_metadata
-    def test_delete_schema_legacy(self):
-        meta = self.metadata
-        eng = engines.testing_engine(options=dict(legacy_schema_aliasing=True))
-        meta.bind = eng
-        conn = eng.connect()
-        conn.exec_driver_sql("create schema paj")
-
-        @event.listens_for(meta, "after_drop")
-        def cleanup(target, connection, **kw):
-            connection.exec_driver_sql("drop schema paj")
-
-        tbl = Table(
-            "test", meta, Column("id", Integer, primary_key=True), schema="paj"
+            "test",
+            meta,
+            Column("id", Integer, primary_key=True),
+            schema=testing.config.test_schema,
         )
-        tbl.create(conn)
-        conn.execute(tbl.insert(), {"id": 1})
-        eq_(conn.scalar(tbl.select()), 1)
-        conn.execute(tbl.delete(tbl.c.id == 1))
-        eq_(conn.scalar(tbl.select()), None)
+        tbl.create(connection)
+        connection.execute(tbl.insert(), {"id": 1})
+        eq_(connection.scalar(tbl.select()), 1)
+        connection.execute(tbl.delete(tbl.c.id == 1))
+        eq_(connection.scalar(tbl.select()), None)
 
     @testing.provide_metadata
     def test_insertid_reserved(self, connection):