.. changelog::
:version: 1.0.5
+ .. change::
+ :tags: bug, mssql
+ :tickets: 3424, 3430
+
+ Added a new dialect flag to the MSSQL dialect
+ ``legacy_schema_aliasing`` which when set to False will disable a
+ very old and obsolete behavior, that of the compiler's
+ attempt to turn all schema-qualified table names into alias names,
+ to work around old and no longer locatable issues where SQL
+ server could not parse a multi-part identifier name in all
+ circumstances. The behavior prevented more
+ sophisticated statements from working correctly, including those which
+ use hints, as well as CRUD statements that embed correlated SELECT
+ statements. Rather than continue to repair the feature to work
+ with more complex statements, it's better to just disable it
+ as it should no longer be needed for any modern SQL server
+ version. The flag defaults to True for the 1.0.x series, leaving
+ current behavior unchanged for this version series. In the 1.1
+ series, it will default to False. For the 1.0 series,
+ when not set to either value explicitly, a warning is emitted
+ when a schema-qualified table is first used in a statement, which
+ suggests that the flag be set to False for all modern SQL Server
+ versions.
+
+ .. seealso::
+
+ :ref:`legacy_schema_rendering`
+
.. change::
:tags: feature, engine
:tickets: 3379
This
is an auxilliary use case suitable for testing and bulk insert scenarios.
+.. _legacy_schema_rendering:
+
+Rendering of SQL statements that include schema qualifiers
+---------------------------------------------------------
+
+When using :class:`.Table` metadata that includes a "schema" qualifier,
+such as::
+
+ account_table = Table(
+ 'account', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('info', String(100)),
+ schema="customer_schema"
+ )
+
+The SQL Server dialect has a long-standing behavior that it will attempt
+to turn a schema-qualified table name into an alias, such as::
+
+ >>> eng = create_engine("mssql+pymssql://mydsn")
+ >>> print(account_table.select().compile(eng))
+ SELECT account_1.id, account_1.info
+ FROM customer_schema.account AS account_1
+
+This behavior is legacy, does not function correctly for many forms
+of SQL statements, and will be disabled by default in the 1.1 series
+of SQLAlchemy. As of 1.0.5, the above statement will produce the following
+warning::
+
+ SAWarning: legacy_schema_aliasing flag is defaulted to True;
+ some schema-qualified queries may not function correctly.
+ Consider setting this flag to False for modern SQL Server versions;
+ this flag will default to False in version 1.1
+
+This warning encourages the :class:`.Engine` to be created as follows::
+
+ >>> eng = create_engine("mssql+pymssql://mydsn", legacy_schema_aliasing=False)
+
+Where the above SELECT statement will produce::
+
+ >>> print(account_table.select().compile(eng))
+ SELECT customer_schema.account.id, customer_schema.account.info
+ FROM customer_schema.account
+
+The warning will not emit if the ``legacy_schema_aliasing`` flag is set
+to either True or False.
+
+.. versionadded:: 1.0.5 - Added the ``legacy_schema_aliasing`` flag to disable
+ the SQL Server dialect's legacy behavior with schema-qualified table
+ names. This flag will default to False in version 1.1.
Collation Support
-----------------
self.tablealiases = {}
super(MSSQLCompiler, self).__init__(*args, **kwargs)
+ def _with_legacy_schema_aliasing(fn):
+ def decorate(self, *arg, **kw):
+ if self.dialect.legacy_schema_aliasing:
+ return fn(self, *arg, **kw)
+ else:
+ super_ = getattr(super(MSSQLCompiler, self), fn.__name__)
+ return super_(*arg, **kw)
+ return decorate
+
def visit_now_func(self, fn, **kw):
return "CURRENT_TIMESTAMP"
else:
return compiler.SQLCompiler.visit_select(self, select, **kwargs)
- def _schema_aliased_table(self, table):
- if getattr(table, 'schema', None) is not None:
- if table not in self.tablealiases:
- self.tablealiases[table] = table.alias()
- return self.tablealiases[table]
- else:
- return None
-
+ @_with_legacy_schema_aliasing
def visit_table(self, table, mssql_aliased=False, iscrud=False, **kwargs):
if mssql_aliased is table or iscrud:
return super(MSSQLCompiler, self).visit_table(table, **kwargs)
else:
return super(MSSQLCompiler, self).visit_table(table, **kwargs)
- def visit_alias(self, alias, **kwargs):
+ @_with_legacy_schema_aliasing
+ def visit_alias(self, alias, **kw):
# translate for schema-qualified table aliases
- kwargs['mssql_aliased'] = alias.original
- return super(MSSQLCompiler, self).visit_alias(alias, **kwargs)
-
- def visit_extract(self, extract, **kw):
- field = self.extract_map.get(extract.field, extract.field)
- return 'DATEPART("%s", %s)' % \
- (field, self.process(extract.expr, **kw))
-
- def visit_savepoint(self, savepoint_stmt):
- return "SAVE TRANSACTION %s" % \
- self.preparer.format_savepoint(savepoint_stmt)
-
- def visit_rollback_to_savepoint(self, savepoint_stmt):
- return ("ROLLBACK TRANSACTION %s"
- % self.preparer.format_savepoint(savepoint_stmt))
+ kw['mssql_aliased'] = alias.original
+ return super(MSSQLCompiler, self).visit_alias(alias, **kw)
- def visit_column(self, column, add_to_result_map=None, **kwargs):
+ @_with_legacy_schema_aliasing
+ def visit_column(self, column, add_to_result_map=None, **kw):
if column.table is not None and \
(not self.isupdate and not self.isdelete) or \
self.is_subquery():
)
return super(MSSQLCompiler, self).\
- visit_column(converted, **kwargs)
+ visit_column(converted, **kw)
return super(MSSQLCompiler, self).visit_column(
- column, add_to_result_map=add_to_result_map, **kwargs)
+ column, add_to_result_map=add_to_result_map, **kw)
+
+ def _schema_aliased_table(self, table):
+ if getattr(table, 'schema', None) is not None:
+ if self.dialect._warn_schema_aliasing and \
+ table.schema.lower() != 'information_schema':
+ util.warn(
+ "legacy_schema_aliasing flag is defaulted to True; "
+ "some schema-qualified queries may not function "
+ "correctly. Consider setting this flag to False for "
+ "modern SQL Server versions; this flag will default to "
+ "False in version 1.1")
+
+ if table not in self.tablealiases:
+ self.tablealiases[table] = table.alias()
+ return self.tablealiases[table]
+ else:
+ return None
+
+ def visit_extract(self, extract, **kw):
+ field = self.extract_map.get(extract.field, extract.field)
+ return 'DATEPART("%s", %s)' % \
+ (field, self.process(extract.expr, **kw))
+
+ def visit_savepoint(self, savepoint_stmt):
+ return "SAVE TRANSACTION %s" % \
+ self.preparer.format_savepoint(savepoint_stmt)
+
+ def visit_rollback_to_savepoint(self, savepoint_stmt):
+ return ("ROLLBACK TRANSACTION %s"
+ % self.preparer.format_savepoint(savepoint_stmt))
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
use_scope_identity=True,
max_identifier_length=None,
schema_name="dbo",
- deprecate_large_types=None, **opts):
+ deprecate_large_types=None,
+ legacy_schema_aliasing=None, **opts):
self.query_timeout = int(query_timeout or 0)
self.schema_name = schema_name
self.max_identifier_length = int(max_identifier_length or 0) or \
self.max_identifier_length
self.deprecate_large_types = deprecate_large_types
+
+ if legacy_schema_aliasing is None:
+ self.legacy_schema_aliasing = True
+ self._warn_schema_aliasing = True
+ else:
+ self.legacy_schema_aliasing = legacy_schema_aliasing
+ self._warn_schema_aliasing = False
+
super(MSDialect, self).__init__(**opts)
def do_savepoint(self, connection, name):
def setup_config(db_url, db_opts, options, file_config, follower_ident):
if follower_ident:
db_url = _follower_url_from_main(db_url, follower_ident)
+ _update_db_opts(db_url, db_opts)
eng = engines.testing_engine(db_url, db_opts)
eng.connect().close()
cfg = config.Config.register(eng, db_opts, options, file_config)
raise NotImplementedError("no DB drop routine for cfg: %s" % eng.url)
+@register.init
+def _update_db_opts(db_url, db_opts):
+ pass
+
+
@register.init
def _configure_follower(cfg, ident):
pass
return url
+@_update_db_opts.for_db("mssql")
+def _mssql_update_db_opts(db_url, db_opts):
+ db_opts['legacy_schema_aliasing'] = False
+
+
@_follower_url_from_main.for_db("sqlite")
def _sqlite_follower_url_from_main(url, ident):
url = sa_url.make_url(url)
class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = mssql.dialect()
+ __dialect__ = mssql.dialect(legacy_schema_aliasing=False)
def test_true_false(self):
self.assert_compile(
t.select().with_hint(t, 'WITH (NOLOCK)'),
'SELECT sometable.somecolumn FROM sometable WITH (NOLOCK)')
+ def test_select_with_nolock_schema(self):
+ m = MetaData()
+ t = Table('sometable', m, Column('somecolumn', Integer),
+ schema='test_schema')
+ self.assert_compile(
+ t.select().with_hint(t, 'WITH (NOLOCK)'),
+ 'SELECT test_schema.sometable.somecolumn '
+ 'FROM test_schema.sometable WITH (NOLOCK)')
+
def test_join_with_hint(self):
t1 = table('t1',
column('a', Integer),
"WHERE sometable.somecolumn = othertable.somecolumn"
)
+ def test_update_to_select_schema(self):
+ meta = MetaData()
+ table = Table(
+ "sometable", meta,
+ Column("sym", String),
+ Column("val", Integer),
+ schema="schema"
+ )
+ other = Table(
+ "#other", meta,
+ Column("sym", String),
+ Column("newval", Integer)
+ )
+ stmt = table.update().values(
+ val=select([other.c.newval]).
+ where(table.c.sym == other.c.sym).as_scalar())
+
+ self.assert_compile(
+ stmt,
+ "UPDATE [schema].sometable SET val="
+ "(SELECT [#other].newval FROM [#other] "
+ "WHERE [schema].sometable.sym = [#other].sym)",
+ )
+
+ stmt = table.update().values(val=other.c.newval).\
+ where(table.c.sym == other.c.sym)
+ self.assert_compile(
+ stmt,
+ "UPDATE [schema].sometable SET val="
+ "[#other].newval FROM [schema].sometable, "
+ "[#other] WHERE [schema].sometable.sym = [#other].sym",
+ )
+
# TODO: not supported yet.
# def test_delete_from_hint(self):
# t = table('sometable', column('somecolumn'))
s = select([tbl.c.id]).where(tbl.c.id == 1)
self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)),
'DELETE FROM paj.test WHERE paj.test.id IN '
- '(SELECT test_1.id FROM paj.test AS test_1 '
- 'WHERE test_1.id = :id_1)')
+ '(SELECT paj.test.id FROM paj.test '
+ 'WHERE paj.test.id = :id_1)')
def test_delete_schema_multipart(self):
metadata = MetaData()
s = select([tbl.c.id]).where(tbl.c.id == 1)
self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)),
'DELETE FROM banana.paj.test WHERE '
- 'banana.paj.test.id IN (SELECT test_1.id '
- 'FROM banana.paj.test AS test_1 WHERE '
- 'test_1.id = :id_1)')
+ 'banana.paj.test.id IN (SELECT banana.paj.test.id '
+ 'FROM banana.paj.test WHERE '
+ 'banana.paj.test.id = :id_1)')
def test_delete_schema_multipart_needs_quoting(self):
metadata = MetaData()
s = select([tbl.c.id]).where(tbl.c.id == 1)
self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)),
'DELETE FROM [banana split].paj.test WHERE '
- '[banana split].paj.test.id IN (SELECT '
- 'test_1.id FROM [banana split].paj.test AS '
- 'test_1 WHERE test_1.id = :id_1)')
+ '[banana split].paj.test.id IN ('
+
+ 'SELECT [banana split].paj.test.id FROM '
+ '[banana split].paj.test WHERE '
+ '[banana split].paj.test.id = :id_1)')
def test_delete_schema_multipart_both_need_quoting(self):
metadata = MetaData()
'space].test WHERE [banana split].[paj '
'with a space].test.id = :id_1')
s = select([tbl.c.id]).where(tbl.c.id == 1)
- self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)),
- 'DELETE FROM [banana split].[paj with a '
- 'space].test WHERE [banana split].[paj '
- 'with a space].test.id IN (SELECT '
- 'test_1.id FROM [banana split].[paj with a '
- 'space].test AS test_1 WHERE test_1.id = '
- ':id_1)')
+ self.assert_compile(
+ tbl.delete().where(tbl.c.id.in_(s)),
+ "DELETE FROM [banana split].[paj with a space].test "
+ "WHERE [banana split].[paj with a space].test.id IN "
+ "(SELECT [banana split].[paj with a space].test.id "
+ "FROM [banana split].[paj with a space].test "
+ "WHERE [banana split].[paj with a space].test.id = :id_1)"
+ )
def test_union(self):
t1 = table(
from sqlalchemy.testing import eq_, engines
from sqlalchemy.sql import table, column
from sqlalchemy.databases import mssql
-from sqlalchemy.testing import fixtures, AssertsCompiledSQL
+from sqlalchemy.testing import fixtures, AssertsCompiledSQL, assertions
from sqlalchemy import testing
from sqlalchemy.util import ue
from sqlalchemy import util
matchtable = None
-class SchemaAliasingTest(fixtures.TestBase, AssertsCompiledSQL):
+class LegacySchemaAliasingTest(fixtures.TestBase, AssertsCompiledSQL):
+ """Legacy behavior tried to prevent schema-qualified tables
+ from being rendered as dotted names, and were instead aliased.
+
+ This behavior no longer seems to be required.
- """SQL server cannot reference schema-qualified tables in a SELECT
- statement, they must be aliased.
"""
- __dialect__ = mssql.dialect()
def setup(self):
metadata = MetaData()
- self.t1 = table('t1',
- column('a', Integer),
- column('b', String),
- column('c', String),
- )
+ self.t1 = table(
+ 't1',
+ column('a', Integer),
+ column('b', String),
+ column('c', String),
+ )
self.t2 = Table(
't2', metadata,
Column("a", Integer),
schema='schema'
)
+ def _assert_sql(self, element, legacy_sql, modern_sql=None):
+ dialect = mssql.dialect()
+
+ with assertions.expect_warnings(
+ "legacy_schema_aliasing flag is defaulted to True.*"):
+ self.assert_compile(
+ element,
+ legacy_sql,
+ dialect=dialect
+ )
+
+ dialect = mssql.dialect(legacy_schema_aliasing=False)
+ self.assert_compile(
+ element,
+ modern_sql or "foob",
+ dialect=dialect
+ )
+
+ def _legacy_dialect(self):
+ return mssql.dialect(legacy_schema_aliasing=True)
+
def test_result_map(self):
s = self.t2.select()
- c = s.compile(dialect=self.__dialect__)
+ c = s.compile(dialect=self._legacy_dialect())
assert self.t2.c.a in set(c._create_result_map()['a'][1])
def test_result_map_use_labels(self):
s = self.t2.select(use_labels=True)
- c = s.compile(dialect=self.__dialect__)
+ c = s.compile(dialect=self._legacy_dialect())
assert self.t2.c.a in set(c._create_result_map()['schema_t2_a'][1])
def test_straight_select(self):
- self.assert_compile(
+ self._assert_sql(
self.t2.select(),
- "SELECT t2_1.a, t2_1.b, t2_1.c FROM [schema].t2 AS t2_1"
+ "SELECT t2_1.a, t2_1.b, t2_1.c FROM [schema].t2 AS t2_1",
+ "SELECT [schema].t2.a, [schema].t2.b, "
+ "[schema].t2.c FROM [schema].t2"
)
def test_straight_select_use_labels(self):
- self.assert_compile(
+ self._assert_sql(
self.t2.select(use_labels=True),
"SELECT t2_1.a AS schema_t2_a, t2_1.b AS schema_t2_b, "
- "t2_1.c AS schema_t2_c FROM [schema].t2 AS t2_1"
+ "t2_1.c AS schema_t2_c FROM [schema].t2 AS t2_1",
+ "SELECT [schema].t2.a AS schema_t2_a, "
+ "[schema].t2.b AS schema_t2_b, "
+ "[schema].t2.c AS schema_t2_c FROM [schema].t2"
)
def test_join_to_schema(self):
t1, t2 = self.t1, self.t2
- self.assert_compile(
+ self._assert_sql(
t1.join(t2, t1.c.a == t2.c.a).select(),
"SELECT t1.a, t1.b, t1.c, t2_1.a, t2_1.b, t2_1.c FROM t1 "
- "JOIN [schema].t2 AS t2_1 ON t2_1.a = t1.a"
+ "JOIN [schema].t2 AS t2_1 ON t2_1.a = t1.a",
+
+ "SELECT t1.a, t1.b, t1.c, [schema].t2.a, [schema].t2.b, "
+ "[schema].t2.c FROM t1 JOIN [schema].t2 ON [schema].t2.a = t1.a"
)
def test_union_schema_to_non(self):
t1, t2 = self.t1, self.t2
s = select([t2.c.a, t2.c.b]).apply_labels().\
union(
- select([t1.c.a, t1.c.b]).apply_labels()
- ).alias().select()
- self.assert_compile(
+ select([t1.c.a, t1.c.b]).apply_labels()).alias().select()
+ self._assert_sql(
s,
"SELECT anon_1.schema_t2_a, anon_1.schema_t2_b FROM "
"(SELECT t2_1.a AS schema_t2_a, t2_1.b AS schema_t2_b "
"FROM [schema].t2 AS t2_1 UNION SELECT t1.a AS t1_a, "
+ "t1.b AS t1_b FROM t1) AS anon_1",
+
+ "SELECT anon_1.schema_t2_a, anon_1.schema_t2_b FROM "
+ "(SELECT [schema].t2.a AS schema_t2_a, [schema].t2.b AS "
+ "schema_t2_b FROM [schema].t2 UNION SELECT t1.a AS t1_a, "
"t1.b AS t1_b FROM t1) AS anon_1"
)
def test_column_subquery_to_alias(self):
a1 = self.t2.alias('a1')
s = select([self.t2, select([a1.c.a]).as_scalar()])
- self.assert_compile(
+ self._assert_sql(
s,
"SELECT t2_1.a, t2_1.b, t2_1.c, "
"(SELECT a1.a FROM [schema].t2 AS a1) "
- "AS anon_1 FROM [schema].t2 AS t2_1"
+ "AS anon_1 FROM [schema].t2 AS t2_1",
+
+ "SELECT [schema].t2.a, [schema].t2.b, [schema].t2.c, "
+ "(SELECT a1.a FROM [schema].t2 AS a1) AS anon_1 FROM [schema].t2"
)
@testing.provide_metadata
def test_insertid_schema(self):
meta = self.metadata
- con = testing.db.connect()
+ eng = engines.testing_engine(
+ options=dict(legacy_schema_aliasing=False))
+ meta.bind = eng
+ con = eng.connect()
+ con.execute('create schema paj')
+
+ @event.listens_for(meta, "after_drop")
+ def cleanup(target, connection, **kw):
+ connection.execute('drop schema paj')
+
+ tbl = Table('test', meta,
+ Column('id', Integer, primary_key=True), schema='paj')
+ tbl.create()
+ tbl.insert().execute({'id': 1})
+ eq_(tbl.select().scalar(), 1)
+
+ @testing.provide_metadata
+ def test_insertid_schema_legacy(self):
+ meta = self.metadata
+ eng = engines.testing_engine(
+ options=dict(legacy_schema_aliasing=True))
+ meta.bind = eng
+ con = eng.connect()
con.execute('create schema paj')
@event.listens_for(meta, "after_drop")
@testing.provide_metadata
def test_delete_schema(self):
meta = self.metadata
- con = testing.db.connect()
+ eng = engines.testing_engine(
+ options=dict(legacy_schema_aliasing=False))
+ meta.bind = eng
+ con = eng.connect()
+ con.execute('create schema paj')
+
+ @event.listens_for(meta, "after_drop")
+ def cleanup(target, connection, **kw):
+ connection.execute('drop schema paj')
+
+ tbl = Table(
+ 'test', meta,
+ Column('id', Integer, primary_key=True), schema='paj')
+ tbl.create()
+ tbl.insert().execute({'id': 1})
+ eq_(tbl.select().scalar(), 1)
+ tbl.delete(tbl.c.id == 1).execute()
+ eq_(tbl.select().scalar(), None)
+
+ @testing.provide_metadata
+ def test_delete_schema_legacy(self):
+ meta = self.metadata
+ eng = engines.testing_engine(
+ options=dict(legacy_schema_aliasing=True))
+ meta.bind = eng
+ con = eng.connect()
con.execute('create schema paj')
@event.listens_for(meta, "after_drop")