]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Added a new dialect flag to the MSSQL dialect
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 25 May 2015 01:02:29 +0000 (21:02 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 25 May 2015 01:02:29 +0000 (21:02 -0400)
``legacy_schema_aliasing`` which when set to False will disable a
very old and obsolete behavior, that of the compiler's
attempt to turn all schema-qualified table names into alias names,
to work around old and no longer locatable issues where SQL
server could not parse a multi-part identifier name in all
circumstances.   The behavior prevented more
sophisticated statements from working correctly, including those which
use hints, as well as CRUD statements that embed correlated SELECT
statements.  Rather than continue to repair the feature to work
with more complex statements, it's better to just disable it
as it should no longer be needed for any modern SQL server
version.  The flag defaults to True for the 1.0.x series, leaving
current behavior unchanged for this version series.  In the 1.1
series, it will default to False.  For the 1.0 series,
when not set to either value explicitly, a warning is emitted
when a schema-qualified table is first used in a statement, which
suggests that the flag be set to False for all modern SQL Server
versions.
fixes #3424
fixes #3430

doc/build/changelog/changelog_10.rst
lib/sqlalchemy/dialects/mssql/base.py
lib/sqlalchemy/testing/provision.py
test/dialect/mssql/test_compiler.py
test/dialect/mssql/test_query.py

index 917485582d484dc9571f98bbceb23789a116a35e..2ed4f85513bbc8da765e5bed44d49dc574fe6c4e 100644 (file)
 .. changelog::
     :version: 1.0.5
 
+    .. change::
+        :tags: bug, mssql
+        :tickets: 3424, 3430
+
+        Added a new dialect flag to the MSSQL dialect
+        ``legacy_schema_aliasing`` which when set to False will disable a
+        very old and obsolete behavior, that of the compiler's
+        attempt to turn all schema-qualified table names into alias names,
+        to work around old and no longer locatable issues where SQL
+        server could not parse a multi-part identifier name in all
+        circumstances.   The behavior prevented more
+        sophisticated statements from working correctly, including those which
+        use hints, as well as CRUD statements that embed correlated SELECT
+        statements.  Rather than continue to repair the feature to work
+        with more complex statements, it's better to just disable it
+        as it should no longer be needed for any modern SQL server
+        version.  The flag defaults to True for the 1.0.x series, leaving
+        current behavior unchanged for this version series.  In the 1.1
+        series, it will default to False.  For the 1.0 series,
+        when not set to either value explicitly, a warning is emitted
+        when a schema-qualified table is first used in a statement, which
+        suggests that the flag be set to False for all modern SQL Server
+        versions.
+
+        .. seealso::
+
+            :ref:`legacy_schema_rendering`
+
     .. change::
         :tags: feature, engine
         :tickets: 3379
index b073af6af00b05eea644d24669c324f50e2edc83..d61ab19583b194207bbf8623cad1f62156b03caf 100644 (file)
@@ -166,6 +166,55 @@ how SQLAlchemy handles this:
 This
 is an auxilliary use case suitable for testing and bulk insert scenarios.
 
+.. _legacy_schema_rendering:
+
+Rendering of SQL statements that include schema qualifiers
+---------------------------------------------------------
+
+When using :class:`.Table` metadata that includes a "schema" qualifier,
+such as::
+
+    account_table = Table(
+        'account', metadata,
+        Column('id', Integer, primary_key=True),
+        Column('info', String(100)),
+        schema="customer_schema"
+    )
+
+The SQL Server dialect has a long-standing behavior that it will attempt
+to turn a schema-qualified table name into an alias, such as::
+
+    >>> eng = create_engine("mssql+pymssql://mydsn")
+    >>> print(account_table.select().compile(eng))
+    SELECT account_1.id, account_1.info
+    FROM customer_schema.account AS account_1
+
+This behavior is legacy, does not function correctly for many forms
+of SQL statements, and will be disabled by default in the 1.1 series
+of SQLAlchemy.   As of 1.0.5, the above statement will produce the following
+warning::
+
+    SAWarning: legacy_schema_aliasing flag is defaulted to True;
+      some schema-qualified queries may not function correctly.
+      Consider setting this flag to False for modern SQL Server versions;
+      this flag will default to False in version 1.1
+
+This warning encourages the :class:`.Engine` to be created as follows::
+
+    >>> eng = create_engine("mssql+pymssql://mydsn", legacy_schema_aliasing=False)
+
+Where the above SELECT statement will produce::
+
+    >>> print(account_table.select().compile(eng))
+    SELECT customer_schema.account.id, customer_schema.account.info
+    FROM customer_schema.account
+
+The warning will not emit if the ``legacy_schema_aliasing`` flag is set
+to either True or False.
+
+.. versionadded:: 1.0.5 - Added the ``legacy_schema_aliasing`` flag to disable
+   the SQL Server dialect's legacy behavior with schema-qualified table
+   names.  This flag will default to False in version 1.1.
 
 Collation Support
 -----------------
@@ -951,6 +1000,15 @@ class MSSQLCompiler(compiler.SQLCompiler):
         self.tablealiases = {}
         super(MSSQLCompiler, self).__init__(*args, **kwargs)
 
+    def _with_legacy_schema_aliasing(fn):
+        def decorate(self, *arg, **kw):
+            if self.dialect.legacy_schema_aliasing:
+                return fn(self, *arg, **kw)
+            else:
+                super_ = getattr(super(MSSQLCompiler, self), fn.__name__)
+                return super_(*arg, **kw)
+        return decorate
+
     def visit_now_func(self, fn, **kw):
         return "CURRENT_TIMESTAMP"
 
@@ -1054,14 +1112,7 @@ class MSSQLCompiler(compiler.SQLCompiler):
         else:
             return compiler.SQLCompiler.visit_select(self, select, **kwargs)
 
-    def _schema_aliased_table(self, table):
-        if getattr(table, 'schema', None) is not None:
-            if table not in self.tablealiases:
-                self.tablealiases[table] = table.alias()
-            return self.tablealiases[table]
-        else:
-            return None
-
+    @_with_legacy_schema_aliasing
     def visit_table(self, table, mssql_aliased=False, iscrud=False, **kwargs):
         if mssql_aliased is table or iscrud:
             return super(MSSQLCompiler, self).visit_table(table, **kwargs)
@@ -1073,25 +1124,14 @@ class MSSQLCompiler(compiler.SQLCompiler):
         else:
             return super(MSSQLCompiler, self).visit_table(table, **kwargs)
 
-    def visit_alias(self, alias, **kwargs):
+    @_with_legacy_schema_aliasing
+    def visit_alias(self, alias, **kw):
         # translate for schema-qualified table aliases
-        kwargs['mssql_aliased'] = alias.original
-        return super(MSSQLCompiler, self).visit_alias(alias, **kwargs)
-
-    def visit_extract(self, extract, **kw):
-        field = self.extract_map.get(extract.field, extract.field)
-        return 'DATEPART("%s", %s)' % \
-            (field, self.process(extract.expr, **kw))
-
-    def visit_savepoint(self, savepoint_stmt):
-        return "SAVE TRANSACTION %s" % \
-            self.preparer.format_savepoint(savepoint_stmt)
-
-    def visit_rollback_to_savepoint(self, savepoint_stmt):
-        return ("ROLLBACK TRANSACTION %s"
-                % self.preparer.format_savepoint(savepoint_stmt))
+        kw['mssql_aliased'] = alias.original
+        return super(MSSQLCompiler, self).visit_alias(alias, **kw)
 
-    def visit_column(self, column, add_to_result_map=None, **kwargs):
+    @_with_legacy_schema_aliasing
+    def visit_column(self, column, add_to_result_map=None, **kw):
         if column.table is not None and \
                 (not self.isupdate and not self.isdelete) or \
                 self.is_subquery():
@@ -1109,10 +1149,40 @@ class MSSQLCompiler(compiler.SQLCompiler):
                     )
 
                 return super(MSSQLCompiler, self).\
-                    visit_column(converted, **kwargs)
+                    visit_column(converted, **kw)
 
         return super(MSSQLCompiler, self).visit_column(
-            column, add_to_result_map=add_to_result_map, **kwargs)
+            column, add_to_result_map=add_to_result_map, **kw)
+
+    def _schema_aliased_table(self, table):
+        if getattr(table, 'schema', None) is not None:
+            if self.dialect._warn_schema_aliasing and \
+                    table.schema.lower() != 'information_schema':
+                util.warn(
+                    "legacy_schema_aliasing flag is defaulted to True; "
+                    "some schema-qualified queries may not function "
+                    "correctly. Consider setting this flag to False for "
+                    "modern SQL Server versions; this flag will default to "
+                    "False in version 1.1")
+
+            if table not in self.tablealiases:
+                self.tablealiases[table] = table.alias()
+            return self.tablealiases[table]
+        else:
+            return None
+
+    def visit_extract(self, extract, **kw):
+        field = self.extract_map.get(extract.field, extract.field)
+        return 'DATEPART("%s", %s)' % \
+            (field, self.process(extract.expr, **kw))
+
+    def visit_savepoint(self, savepoint_stmt):
+        return "SAVE TRANSACTION %s" % \
+            self.preparer.format_savepoint(savepoint_stmt)
+
+    def visit_rollback_to_savepoint(self, savepoint_stmt):
+        return ("ROLLBACK TRANSACTION %s"
+                % self.preparer.format_savepoint(savepoint_stmt))
 
     def visit_binary(self, binary, **kwargs):
         """Move bind parameters to the right-hand side of an operator, where
@@ -1455,7 +1525,8 @@ class MSDialect(default.DefaultDialect):
                  use_scope_identity=True,
                  max_identifier_length=None,
                  schema_name="dbo",
-                 deprecate_large_types=None, **opts):
+                 deprecate_large_types=None,
+                 legacy_schema_aliasing=None, **opts):
         self.query_timeout = int(query_timeout or 0)
         self.schema_name = schema_name
 
@@ -1463,6 +1534,14 @@ class MSDialect(default.DefaultDialect):
         self.max_identifier_length = int(max_identifier_length or 0) or \
             self.max_identifier_length
         self.deprecate_large_types = deprecate_large_types
+
+        if legacy_schema_aliasing is None:
+            self.legacy_schema_aliasing = True
+            self._warn_schema_aliasing = True
+        else:
+            self.legacy_schema_aliasing = legacy_schema_aliasing
+            self._warn_schema_aliasing = False
+
         super(MSDialect, self).__init__(**opts)
 
     def do_savepoint(self, connection, name):
index c8f7fdf304331503dc6ffd7873ca38732ac7ffb3..8469a0658964411154a2f8da3ef35ef6fdce016b 100644 (file)
@@ -49,6 +49,7 @@ def configure_follower(follower_ident):
 def setup_config(db_url, db_opts, options, file_config, follower_ident):
     if follower_ident:
         db_url = _follower_url_from_main(db_url, follower_ident)
+    _update_db_opts(db_url, db_opts)
     eng = engines.testing_engine(db_url, db_opts)
     eng.connect().close()
     cfg = config.Config.register(eng, db_opts, options, file_config)
@@ -93,6 +94,11 @@ def _drop_db(cfg, eng, ident):
     raise NotImplementedError("no DB drop routine for cfg: %s" % eng.url)
 
 
+@register.init
+def _update_db_opts(db_url, db_opts):
+    pass
+
+
 @register.init
 def _configure_follower(cfg, ident):
     pass
@@ -105,6 +111,11 @@ def _follower_url_from_main(url, ident):
     return url
 
 
+@_update_db_opts.for_db("mssql")
+def _mssql_update_db_opts(db_url, db_opts):
+    db_opts['legacy_schema_aliasing'] = False
+
+
 @_follower_url_from_main.for_db("sqlite")
 def _sqlite_follower_url_from_main(url, ident):
     url = sa_url.make_url(url)
index 32686f5444413bd6c19772d2192bcf9fcaa2102d..9d89f040b3160499684e6387342a69ff065ff81b 100644 (file)
@@ -12,7 +12,7 @@ from sqlalchemy import Integer, String, Table, Column, select, MetaData,\
 
 
 class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
-    __dialect__ = mssql.dialect()
+    __dialect__ = mssql.dialect(legacy_schema_aliasing=False)
 
     def test_true_false(self):
         self.assert_compile(
@@ -34,6 +34,15 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
             t.select().with_hint(t, 'WITH (NOLOCK)'),
             'SELECT sometable.somecolumn FROM sometable WITH (NOLOCK)')
 
+    def test_select_with_nolock_schema(self):
+        m = MetaData()
+        t = Table('sometable', m, Column('somecolumn', Integer),
+                  schema='test_schema')
+        self.assert_compile(
+            t.select().with_hint(t, 'WITH (NOLOCK)'),
+            'SELECT test_schema.sometable.somecolumn '
+            'FROM test_schema.sometable WITH (NOLOCK)')
+
     def test_join_with_hint(self):
         t1 = table('t1',
                    column('a', Integer),
@@ -143,6 +152,39 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
                 "WHERE sometable.somecolumn = othertable.somecolumn"
             )
 
+    def test_update_to_select_schema(self):
+        meta = MetaData()
+        table = Table(
+            "sometable", meta,
+            Column("sym", String),
+            Column("val", Integer),
+            schema="schema"
+        )
+        other = Table(
+            "#other", meta,
+            Column("sym", String),
+            Column("newval", Integer)
+        )
+        stmt = table.update().values(
+            val=select([other.c.newval]).
+            where(table.c.sym == other.c.sym).as_scalar())
+
+        self.assert_compile(
+            stmt,
+            "UPDATE [schema].sometable SET val="
+            "(SELECT [#other].newval FROM [#other] "
+            "WHERE [schema].sometable.sym = [#other].sym)",
+        )
+
+        stmt = table.update().values(val=other.c.newval).\
+            where(table.c.sym == other.c.sym)
+        self.assert_compile(
+            stmt,
+            "UPDATE [schema].sometable SET val="
+            "[#other].newval FROM [schema].sometable, "
+            "[#other] WHERE [schema].sometable.sym = [#other].sym",
+        )
+
     # TODO: not supported yet.
     # def test_delete_from_hint(self):
     #    t = table('sometable', column('somecolumn'))
@@ -236,8 +278,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
         s = select([tbl.c.id]).where(tbl.c.id == 1)
         self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)),
                             'DELETE FROM paj.test WHERE paj.test.id IN '
-                            '(SELECT test_1.id FROM paj.test AS test_1 '
-                            'WHERE test_1.id = :id_1)')
+                            '(SELECT paj.test.id FROM paj.test '
+                            'WHERE paj.test.id = :id_1)')
 
     def test_delete_schema_multipart(self):
         metadata = MetaData()
@@ -252,9 +294,9 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
         s = select([tbl.c.id]).where(tbl.c.id == 1)
         self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)),
                             'DELETE FROM banana.paj.test WHERE '
-                            'banana.paj.test.id IN (SELECT test_1.id '
-                            'FROM banana.paj.test AS test_1 WHERE '
-                            'test_1.id = :id_1)')
+                            'banana.paj.test.id IN (SELECT banana.paj.test.id '
+                            'FROM banana.paj.test WHERE '
+                            'banana.paj.test.id = :id_1)')
 
     def test_delete_schema_multipart_needs_quoting(self):
         metadata = MetaData()
@@ -268,9 +310,11 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
         s = select([tbl.c.id]).where(tbl.c.id == 1)
         self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)),
                             'DELETE FROM [banana split].paj.test WHERE '
-                            '[banana split].paj.test.id IN (SELECT '
-                            'test_1.id FROM [banana split].paj.test AS '
-                            'test_1 WHERE test_1.id = :id_1)')
+                            '[banana split].paj.test.id IN ('
+
+                            'SELECT [banana split].paj.test.id FROM '
+                            '[banana split].paj.test WHERE '
+                            '[banana split].paj.test.id = :id_1)')
 
     def test_delete_schema_multipart_both_need_quoting(self):
         metadata = MetaData()
@@ -282,13 +326,14 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
                             'space].test WHERE [banana split].[paj '
                             'with a space].test.id = :id_1')
         s = select([tbl.c.id]).where(tbl.c.id == 1)
-        self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)),
-                            'DELETE FROM [banana split].[paj with a '
-                            'space].test WHERE [banana split].[paj '
-                            'with a space].test.id IN (SELECT '
-                            'test_1.id FROM [banana split].[paj with a '
-                            'space].test AS test_1 WHERE test_1.id = '
-                            ':id_1)')
+        self.assert_compile(
+            tbl.delete().where(tbl.c.id.in_(s)),
+            "DELETE FROM [banana split].[paj with a space].test "
+            "WHERE [banana split].[paj with a space].test.id IN "
+            "(SELECT [banana split].[paj with a space].test.id "
+            "FROM [banana split].[paj with a space].test "
+            "WHERE [banana split].[paj with a space].test.id = :id_1)"
+        )
 
     def test_union(self):
         t1 = table(
index 19c0dd90f2cbc018106065e0aa6786ad23cd70a0..61ae32ef4a996aae43648936877a48328fb37c16 100644 (file)
@@ -2,7 +2,7 @@
 from sqlalchemy.testing import eq_, engines
 from sqlalchemy.sql import table, column
 from sqlalchemy.databases import mssql
-from sqlalchemy.testing import fixtures, AssertsCompiledSQL
+from sqlalchemy.testing import fixtures, AssertsCompiledSQL, assertions
 from sqlalchemy import testing
 from sqlalchemy.util import ue
 from sqlalchemy import util
@@ -16,20 +16,22 @@ cattable = None
 matchtable = None
 
 
-class SchemaAliasingTest(fixtures.TestBase, AssertsCompiledSQL):
+class LegacySchemaAliasingTest(fixtures.TestBase, AssertsCompiledSQL):
+    """Legacy behavior tried to prevent schema-qualified tables
+    from being rendered as dotted names, and were instead aliased.
+
+    This behavior no longer seems to be required.
 
-    """SQL server cannot reference schema-qualified tables in a SELECT
-    statement, they must be aliased.
     """
-    __dialect__ = mssql.dialect()
 
     def setup(self):
         metadata = MetaData()
-        self.t1 = table('t1',
-                        column('a', Integer),
-                        column('b', String),
-                        column('c', String),
-                        )
+        self.t1 = table(
+            't1',
+            column('a', Integer),
+            column('b', String),
+            column('c', String),
+        )
         self.t2 = Table(
             't2', metadata,
             Column("a", Integer),
@@ -38,59 +40,95 @@ class SchemaAliasingTest(fixtures.TestBase, AssertsCompiledSQL):
             schema='schema'
         )
 
+    def _assert_sql(self, element, legacy_sql, modern_sql=None):
+        dialect = mssql.dialect()
+
+        with assertions.expect_warnings(
+                "legacy_schema_aliasing flag is defaulted to True.*"):
+            self.assert_compile(
+                element,
+                legacy_sql,
+                dialect=dialect
+            )
+
+        dialect = mssql.dialect(legacy_schema_aliasing=False)
+        self.assert_compile(
+            element,
+            modern_sql or "foob",
+            dialect=dialect
+        )
+
+    def _legacy_dialect(self):
+        return mssql.dialect(legacy_schema_aliasing=True)
+
     def test_result_map(self):
         s = self.t2.select()
-        c = s.compile(dialect=self.__dialect__)
+        c = s.compile(dialect=self._legacy_dialect())
         assert self.t2.c.a in set(c._create_result_map()['a'][1])
 
     def test_result_map_use_labels(self):
         s = self.t2.select(use_labels=True)
-        c = s.compile(dialect=self.__dialect__)
+        c = s.compile(dialect=self._legacy_dialect())
         assert self.t2.c.a in set(c._create_result_map()['schema_t2_a'][1])
 
     def test_straight_select(self):
-        self.assert_compile(
+        self._assert_sql(
             self.t2.select(),
-            "SELECT t2_1.a, t2_1.b, t2_1.c FROM [schema].t2 AS t2_1"
+            "SELECT t2_1.a, t2_1.b, t2_1.c FROM [schema].t2 AS t2_1",
+            "SELECT [schema].t2.a, [schema].t2.b, "
+            "[schema].t2.c FROM [schema].t2"
         )
 
     def test_straight_select_use_labels(self):
-        self.assert_compile(
+        self._assert_sql(
             self.t2.select(use_labels=True),
             "SELECT t2_1.a AS schema_t2_a, t2_1.b AS schema_t2_b, "
-            "t2_1.c AS schema_t2_c FROM [schema].t2 AS t2_1"
+            "t2_1.c AS schema_t2_c FROM [schema].t2 AS t2_1",
+            "SELECT [schema].t2.a AS schema_t2_a, "
+            "[schema].t2.b AS schema_t2_b, "
+            "[schema].t2.c AS schema_t2_c FROM [schema].t2"
         )
 
     def test_join_to_schema(self):
         t1, t2 = self.t1, self.t2
-        self.assert_compile(
+        self._assert_sql(
             t1.join(t2, t1.c.a == t2.c.a).select(),
             "SELECT t1.a, t1.b, t1.c, t2_1.a, t2_1.b, t2_1.c FROM t1 "
-            "JOIN [schema].t2 AS t2_1 ON t2_1.a = t1.a"
+            "JOIN [schema].t2 AS t2_1 ON t2_1.a = t1.a",
+
+            "SELECT t1.a, t1.b, t1.c, [schema].t2.a, [schema].t2.b, "
+            "[schema].t2.c FROM t1 JOIN [schema].t2 ON [schema].t2.a = t1.a"
         )
 
     def test_union_schema_to_non(self):
         t1, t2 = self.t1, self.t2
         s = select([t2.c.a, t2.c.b]).apply_labels().\
             union(
-            select([t1.c.a, t1.c.b]).apply_labels()
-        ).alias().select()
-        self.assert_compile(
+                select([t1.c.a, t1.c.b]).apply_labels()).alias().select()
+        self._assert_sql(
             s,
             "SELECT anon_1.schema_t2_a, anon_1.schema_t2_b FROM "
             "(SELECT t2_1.a AS schema_t2_a, t2_1.b AS schema_t2_b "
             "FROM [schema].t2 AS t2_1 UNION SELECT t1.a AS t1_a, "
+            "t1.b AS t1_b FROM t1) AS anon_1",
+
+            "SELECT anon_1.schema_t2_a, anon_1.schema_t2_b FROM "
+            "(SELECT [schema].t2.a AS schema_t2_a, [schema].t2.b AS "
+            "schema_t2_b FROM [schema].t2 UNION SELECT t1.a AS t1_a, "
             "t1.b AS t1_b FROM t1) AS anon_1"
         )
 
     def test_column_subquery_to_alias(self):
         a1 = self.t2.alias('a1')
         s = select([self.t2, select([a1.c.a]).as_scalar()])
-        self.assert_compile(
+        self._assert_sql(
             s,
             "SELECT t2_1.a, t2_1.b, t2_1.c, "
             "(SELECT a1.a FROM [schema].t2 AS a1) "
-            "AS anon_1 FROM [schema].t2 AS t2_1"
+            "AS anon_1 FROM [schema].t2 AS t2_1",
+
+            "SELECT [schema].t2.a, [schema].t2.b, [schema].t2.c, "
+            "(SELECT a1.a FROM [schema].t2 AS a1) AS anon_1 FROM [schema].t2"
 
         )
 
@@ -315,7 +353,29 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
     @testing.provide_metadata
     def test_insertid_schema(self):
         meta = self.metadata
-        con = testing.db.connect()
+        eng = engines.testing_engine(
+            options=dict(legacy_schema_aliasing=False))
+        meta.bind = eng
+        con = eng.connect()
+        con.execute('create schema paj')
+
+        @event.listens_for(meta, "after_drop")
+        def cleanup(target, connection, **kw):
+            connection.execute('drop schema paj')
+
+        tbl = Table('test', meta,
+                    Column('id', Integer, primary_key=True), schema='paj')
+        tbl.create()
+        tbl.insert().execute({'id': 1})
+        eq_(tbl.select().scalar(), 1)
+
+    @testing.provide_metadata
+    def test_insertid_schema_legacy(self):
+        meta = self.metadata
+        eng = engines.testing_engine(
+            options=dict(legacy_schema_aliasing=True))
+        meta.bind = eng
+        con = eng.connect()
         con.execute('create schema paj')
 
         @event.listens_for(meta, "after_drop")
@@ -345,7 +405,32 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
     @testing.provide_metadata
     def test_delete_schema(self):
         meta = self.metadata
-        con = testing.db.connect()
+        eng = engines.testing_engine(
+            options=dict(legacy_schema_aliasing=False))
+        meta.bind = eng
+        con = eng.connect()
+        con.execute('create schema paj')
+
+        @event.listens_for(meta, "after_drop")
+        def cleanup(target, connection, **kw):
+            connection.execute('drop schema paj')
+
+        tbl = Table(
+            'test', meta,
+            Column('id', Integer, primary_key=True), schema='paj')
+        tbl.create()
+        tbl.insert().execute({'id': 1})
+        eq_(tbl.select().scalar(), 1)
+        tbl.delete(tbl.c.id == 1).execute()
+        eq_(tbl.select().scalar(), None)
+
+    @testing.provide_metadata
+    def test_delete_schema_legacy(self):
+        meta = self.metadata
+        eng = engines.testing_engine(
+            options=dict(legacy_schema_aliasing=True))
+        meta.bind = eng
+        con = eng.connect()
         con.execute('create schema paj')
 
         @event.listens_for(meta, "after_drop")