--- /dev/null
+.. change::
+ :tags: bug, mssql, orm
+
+ Enabled the "sane_rowcount" flag for the pymssql dialect, indicating
+ that the DBAPI now reports the correct number of rows affected from
+ an UPDATE or DELETE statement. This impacts mostly the ORM versioning
+ feature in that it now can verify the number of rows affected on a
+ target version.
\ No newline at end of file
from .. import util
-import sys
import re
supports_sane_rowcount_returning = False
supports_sane_multi_rowcount = False
- if util.py2k:
- # PyODBC unicode is broken on UCS-4 builds
- supports_unicode = sys.maxunicode == 65535
- supports_unicode_statements = supports_unicode
+ supports_unicode_statements = True
+ supports_unicode_binds = True
supports_native_decimal = True
default_paramstyle = 'named'
# hold the desired driver name
pyodbc_driver_name = None
- # will be set to True after initialize()
- # if the freetds.so is detected
- freetds = False
-
- # will be set to the string version of
- # the FreeTDS driver if freetds is detected
- freetds_driver_version = None
-
- # will be set to True after initialize()
- # if the libessqlsrv.so is detected
- easysoft = False
-
def __init__(self, supports_unicode_binds=None, **kw):
super(PyODBCConnector, self).__init__(**kw)
- self._user_supports_unicode_binds = supports_unicode_binds
+ if supports_unicode_binds is not None:
+ self.supports_unicode_binds = supports_unicode_binds
@classmethod
def dbapi(cls):
else:
return False
- def initialize(self, connection):
- # determine FreeTDS first. can't issue SQL easily
- # without getting unicode_statements/binds set up.
-
- pyodbc = self.dbapi
-
- dbapi_con = connection.connection
-
- _sql_driver_name = dbapi_con.getinfo(pyodbc.SQL_DRIVER_NAME)
- self.freetds = bool(re.match(r".*libtdsodbc.*\.so", _sql_driver_name
- ))
- self.easysoft = bool(re.match(r".*libessqlsrv.*\.so", _sql_driver_name
- ))
-
- if self.freetds:
- self.freetds_driver_version = dbapi_con.getinfo(
- pyodbc.SQL_DRIVER_VER)
-
- self.supports_unicode_statements = (
- not util.py2k or
- (not self.freetds and not self.easysoft)
- )
-
- if self._user_supports_unicode_binds is not None:
- self.supports_unicode_binds = self._user_supports_unicode_binds
- elif util.py2k:
- self.supports_unicode_binds = (
- not self.freetds or self.freetds_driver_version >= '0.91'
- ) and not self.easysoft
- else:
- self.supports_unicode_binds = True
-
- # run other initialization which asks for user name, etc.
- super(PyODBCConnector, self).initialize(connection)
+ # def initialize(self, connection):
+ # super(PyODBCConnector, self).initialize(connection)
def _dbapi_version(self):
if not self.dbapi:
Rowcount Support / ORM Versioning
---------------------------------
-The SQL Server drivers have very limited ability to return the number
-of rows updated from an UPDATE or DELETE statement. In particular, the
-pymssql driver has no support, whereas the pyodbc driver can only return
-this value under certain conditions.
-
-In particular, updated rowcount is not available when OUTPUT INSERTED
-is used. This impacts the SQLAlchemy ORM's versioning feature when
-server-side versioning schemes are used. When
-using pyodbc, the "implicit_returning" flag needs to be set to false
-for any ORM mapped class that uses a version_id column in conjunction with
-a server-side version generator::
+The SQL Server drivers may have limited ability to return the number
+of rows updated from an UPDATE or DELETE statement.
+
+As of this writing, the PyODBC driver is not able to return a rowcount when
+OUTPUT INSERTED is used. This impacts the SQLAlchemy ORM's versioning feature
+in many cases where server-side value generators are in use in that while the
+versioning operations can succeed, the ORM cannot always check that an UPDATE
+or DELETE statement matched the number of rows expected, which is how it
+verifies that the version identifier matched. When this condition occurs, a
+warning will be emitted but the operation will proceed.
+
+The use of OUTPUT INSERTED can be disabled by setting the
+:paramref:`.Table.implicit_returning` flag to ``False`` on a particular
+:class:`.Table`, which in declarative looks like::
class MyTable(Base):
__tablename__ = 'mytable'
'implicit_returning': False
}
-Without the implicit_returning flag above, the UPDATE statement will
-use ``OUTPUT inserted.timestamp`` and the rowcount will be returned as
--1, causing the versioning logic to fail.
-
Enabling Snapshot Isolation
---------------------------
-Not necessarily specific to SQLAlchemy, SQL Server has a default transaction
+SQL Server has a default transaction
isolation mode that locks entire tables, and causes even mildly concurrent
applications to have long held locks and frequent deadlocks.
Enabling snapshot isolation for the database as a whole is recommended
Background on SQL Server snapshot isolation is available at
http://msdn.microsoft.com/en-us/library/ms175095.aspx.
-Known Issues
-------------
-
-* No support for more than one ``IDENTITY`` column per table
-* reflection of indexes does not work with versions older than
- SQL Server 2005
"""
import datetime
@compiles(_cast_on_2005)
def _compile(element, compiler, **kw):
from . import base
- if compiler.dialect.server_version_info < base.MS_2005_VERSION:
+ if compiler.dialect.server_version_info is None or \
+ compiler.dialect.server_version_info < base.MS_2005_VERSION:
return compiler.process(element.bindvalue, **kw)
else:
return compiler.process(cast(element.bindvalue, Unicode), **kw)
`FreeTDS <http://www.freetds.org/>`_. Compatible builds are available for
Linux, MacOSX and Windows platforms.
+Modern versions of this driver work very well with SQL Server and
+FreeTDS from Linux and is highly recommended.
+
"""
from .base import MSDialect, MSIdentifierPreparer
from ... import types as sqltypes, util, processors
class MSDialect_pymssql(MSDialect):
- supports_sane_rowcount = False
+ supports_native_decimal = True
driver = 'pymssql'
preparer = MSIdentifierPreparer_pymssql
"the 1.0 series of the pymssql DBAPI.")
return module
- def __init__(self, **params):
- super(MSDialect_pymssql, self).__init__(**params)
- self.use_scope_identity = True
-
def _get_server_version_info(self, connection):
vers = connection.scalar("select @@version")
m = re.match(
else:
connection.autocommit(False)
super(MSDialect_pymssql, self).set_isolation_level(connection,
- level)
+ level)
+
dialect = MSDialect_pymssql
engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
-Unicode Binds
--------------
-
-The current state of PyODBC on a unix backend with FreeTDS and/or
-EasySoft is poor regarding unicode; different OS platforms and versions of
-UnixODBC versus IODBC versus FreeTDS/EasySoft versus PyODBC itself
-dramatically alter how strings are received. The PyODBC dialect attempts to
-use all the information it knows to determine whether or not a Python unicode
-literal can be passed directly to the PyODBC driver or not; while SQLAlchemy
-can encode these to bytestrings first, some users have reported that PyODBC
-mis-handles bytestrings for certain encodings and requires a Python unicode
-object, while the author has observed widespread cases where a Python unicode
-is completely misinterpreted by PyODBC, particularly when dealing with
-the information schema tables used in table reflection, and the value
-must first be encoded to a bytestring.
-
-It is for this reason that whether or not unicode literals for bound
-parameters be sent to PyODBC can be controlled using the
-``supports_unicode_binds`` parameter to ``create_engine()``. When
-left at its default of ``None``, the PyODBC dialect will use its
-best guess as to whether or not the driver deals with unicode literals
-well. When ``False``, unicode literals will be encoded first, and when
-``True`` unicode literals will be passed straight through. This is an interim
-flag that hopefully should not be needed when the unicode situation stabilizes
-for unix + PyODBC.
-
-.. versionadded:: 0.7.7
- ``supports_unicode_binds`` parameter to ``create_engine()``\ .
+Driver / Unicode Support
+-------------------------
+
+PyODBC works best with Microsoft ODBC drivers, particularly in the area
+of Unicode support on both Python 2 and Python 3.
+
+Using the FreeTDS ODBC drivers on Linux or OSX with PyODBC is **not**
+recommended; there have been historically many Unicode-related issues
+in this area, including before Microsoft offered ODBC drivers for Linux
+and OSX. Now that Microsoft offers drivers for all platforms, for
+PyODBC support these are recommended. FreeTDS remains relevant for
+non-ODBC drivers such as pymssql where it works very well.
+
Rowcount Support
----------------
def _get_server_version_info(self, connection):
try:
- raw = connection.scalar("SELECT SERVERPROPERTY('ProductVersion')")
+ raw = connection.scalar(
+ "SELECT CAST(SERVERPROPERTY('ProductVersion') AS VARCHAR)")
except exc.DBAPIError:
# SQL Server docs indicate this function isn't present prior to
- # 2008; additionally, unknown combinations of pyodbc aren't
- # able to run this query.
+ # 2008. Before we had the VARCHAR cast above, pyodbc would also
+ # fail on this query.
return super(MSDialect_pyodbc, self).\
_get_server_version_info(connection)
else:
def _is_mariadb(self):
return 'MariaDB' in self.server_version_info
+ @property
+ def _mariadb_normalized_version_info(self):
+ if len(self.server_version_info) > 5:
+ return self.server_version_info[3:]
+ else:
+ return self.server_version_info
+
@property
def _supports_cast(self):
return self.server_version_info is None or \
db_opts['_retry_on_12516'] = True
-def reap_oracle_dbs(idents_file):
- log.info("Reaping Oracle dbs...")
+def reap_dbs(idents_file):
+ log.info("Reaping databases...")
+
+ urls = collections.defaultdict(set)
+ idents = collections.defaultdict(set)
- urls = collections.defaultdict(list)
with open(idents_file) as file_:
for line in file_:
line = line.strip()
db_name, db_url = line.split(" ")
- urls[db_url].append(db_name)
-
- for url in urls:
- if not url.startswith("oracle"):
- continue
- idents = urls[url]
- log.info("db reaper connecting to %r", url)
- eng = create_engine(url)
- with eng.connect() as conn:
-
- log.info("identifiers in file: %s", ", ".join(idents))
-
- to_reap = conn.execute(
- "select u.username from all_users u where username "
- "like 'TEST_%' and not exists (select username "
- "from v$session where username=u.username)")
- all_names = {username.lower() for (username, ) in to_reap}
- to_drop = set()
- for name in all_names:
- if name.endswith("_ts1") or name.endswith("_ts2"):
- continue
- elif name in idents:
- to_drop.add(name)
- if "%s_ts1" % name in all_names:
- to_drop.add("%s_ts1" % name)
- if "%s_ts2" % name in all_names:
- to_drop.add("%s_ts2" % name)
-
- dropped = total = 0
- for total, username in enumerate(to_drop, 1):
- if _ora_drop_ignore(conn, username):
- dropped += 1
- log.info(
- "Dropped %d out of %d stale databases detected",
- dropped, total)
+ url_obj = sa_url.make_url(db_url)
+ url_key = (url_obj.get_backend_name(), url_obj.host)
+ urls[url_key].add(db_url)
+ idents[url_key].add(db_name)
+
+ for url_key in urls:
+ backend = url_key[0]
+ url = list(urls[url_key])[0]
+ ident = idents[url_key]
+ if backend == "oracle":
+ _reap_oracle_dbs(url, ident)
+ elif backend == "mssql":
+ _reap_mssql_dbs(url, ident)
+
+def _reap_oracle_dbs(url, idents):
+ log.info("db reaper connecting to %r", url)
+ eng = create_engine(url)
+ with eng.connect() as conn:
+
+ log.info("identifiers in file: %s", ", ".join(idents))
+
+ to_reap = conn.execute(
+ "select u.username from all_users u where username "
+ "like 'TEST_%' and not exists (select username "
+ "from v$session where username=u.username)")
+ all_names = {username.lower() for (username, ) in to_reap}
+ to_drop = set()
+ for name in all_names:
+ if name.endswith("_ts1") or name.endswith("_ts2"):
+ continue
+ elif name in idents:
+ to_drop.add(name)
+ if "%s_ts1" % name in all_names:
+ to_drop.add("%s_ts1" % name)
+ if "%s_ts2" % name in all_names:
+ to_drop.add("%s_ts2" % name)
+
+ dropped = total = 0
+ for total, username in enumerate(to_drop, 1):
+ if _ora_drop_ignore(conn, username):
+ dropped += 1
+ log.info(
+ "Dropped %d out of %d stale databases detected",
+ dropped, total)
+
@_follower_url_from_main.for_db("oracle")
return url
+@_create_db.for_db("mssql")
+def _mssql_create_db(cfg, eng, ident):
+ with eng.connect().execution_options(
+ isolation_level="AUTOCOMMIT") as conn:
+ conn.execute("create database %s" % ident)
+ conn.execute(
+ "ALTER DATABASE %s SET ALLOW_SNAPSHOT_ISOLATION ON" % ident)
+ conn.execute(
+ "ALTER DATABASE %s SET READ_COMMITTED_SNAPSHOT ON" % ident)
+ conn.execute("use %s" % ident)
+ conn.execute("create schema test_schema")
+
+
+@_drop_db.for_db("mssql")
+def _mssql_drop_db(cfg, eng, ident):
+ with eng.connect().execution_options(
+ isolation_level="AUTOCOMMIT") as conn:
+ _mssql_drop_ignore(conn, ident)
+
+def _mssql_drop_ignore(conn, ident):
+ try:
+ # typically when this happens, we can't KILL the session anyway,
+ # so let the cleanup process drop the DBs
+ # for row in conn.execute("select session_id from sys.dm_exec_sessions "
+ # "where database_id=db_id('%s')" % ident):
+ # log.info("killing SQL server sesssion %s", row['session_id'])
+ # conn.execute("kill %s" % row['session_id'])
+
+ conn.execute("drop database %s" % ident)
+ log.info("Reaped db: %s", ident)
+ return True
+ except exc.DatabaseError as err:
+ log.warning("couldn't drop db: %s", err)
+ return False
+
+
+def _reap_mssql_dbs(url, idents):
+ log.info("db reaper connecting to %r", url)
+ eng = create_engine(url)
+ with eng.connect().execution_options(
+ isolation_level="AUTOCOMMIT") as conn:
+
+ log.info("identifiers in file: %s", ", ".join(idents))
+
+ to_reap = conn.execute(
+ "select d.name from sys.databases as d where name "
+ "like 'TEST_%' and not exists (select session_id "
+ "from sys.dm_exec_sessions "
+ "where database_id=d.database_id)")
+ all_names = {dbname.lower() for (dbname, ) in to_reap}
+ to_drop = set()
+ for name in all_names:
+ if name in idents:
+ to_drop.add(name)
+
+ dropped = total = 0
+ for total, dbname in enumerate(to_drop, 1):
+ if _mssql_drop_ignore(conn, dbname):
+ dropped += 1
+ log.info(
+ "Dropped %d out of %d stale databases detected",
+ dropped, total)
"""Target database must support window functions."""
return exclusions.closed()
+ @property
+ def ctes(self):
+ """Target database supports CTEs"""
+
+ return exclusions.closed()
+
+ @property
+ def ctes_on_dml(self):
+ """target database supports CTES which consist of INSERT, UPDATE
+ or DELETE"""
+
+ return exclusions.closed()
+
@property
def autoincrement_insert(self):
"""target platform generates new surrogate integer primary key values
"""
return exclusions.closed()
+ @property
+ def nested_aggregates(self):
+ """target database can select an aggregate from a subquery that's
+ also using an aggregate
+
+ """
+ return exclusions.open()
+
+ @property
+ def recursive_fk_cascade(self):
+ """target database must support ON DELETE CASCADE on a self-referential
+ foreign key
+
+ """
+ return exclusions.open()
+
@property
def precision_numerics_retains_significant_digits(self):
"""A precision numeric type will return empty significant digits,
[(2, 2, 3), (3, 3, 4)]
)
+ @testing.requires.order_by_col_from_union
@testing.requires.parens_in_union_contained_select_w_limit_offset
def test_limit_offset_selectable_in_unions(self):
table = self.tables.some_table
-"""Drop Oracle databases that are left over from a
+"""Drop Oracle, SQL Server databases that are left over from a
multiprocessing test run.
Currently the cx_Oracle driver seems to sometimes not release a
TCP connection even if close() is called, which prevents the provisioning
system from dropping a database in-process.
+For SQL Server, databases still remain in use after tests run and
+running a kill of all detected sessions does not seem to release the
+database in process.
+
"""
from sqlalchemy.testing import provision
import logging
logging.basicConfig()
logging.getLogger(provision.__name__).setLevel(logging.INFO)
-provision.reap_oracle_dbs(sys.argv[1])
+provision.reap_dbs(sys.argv[1])
class MemUsageWBackendTest(EnsureZeroed):
__tags__ = 'memory_intensive',
- __requires__ = 'cpython',
+ __requires__ = 'cpython', 'memory_process_intensive'
__backend__ = True
# ensure a pure growing test trips the assertion
from sqlalchemy import testing
from sqlalchemy.util import ue
from sqlalchemy import util
-from sqlalchemy.testing.assertsql import CursorSQL
+from sqlalchemy.testing.assertsql import CursorSQL, DialectSQL
from sqlalchemy import Integer, String, Table, Column, select, MetaData,\
func, PrimaryKeyConstraint, desc, Sequence, DDL, ForeignKey, or_, and_
from sqlalchemy import event
__only_on__ = 'mssql'
__backend__ = True
+ @testing.requires.mssql_freetds
+ @testing.requires.python2
def test_convert_unicode(self):
meta = MetaData(testing.db)
t1 = Table(
meta.drop_all()
@testing.provide_metadata
- def test_disable_scope_identity(self):
+ def _test_disable_scope_identity(self):
engine = engines.testing_engine(options={"use_scope_identity": False})
metadata = self.metadata
t1 = Table(
with self.sql_execution_asserter(engine) as asserter:
engine.execute(t1.insert(), {"data": "somedata"})
+ # TODO: need a dialect SQL that acts like Cursor SQL
asserter.assert_(
- CursorSQL(
- "INSERT INTO t1 (data) VALUES (?)",
- ("somedata", )
+ DialectSQL(
+ "INSERT INTO t1 (data) VALUES (:data)",
+ {"data": "somedata"}
),
CursorSQL("SELECT @@identity AS lastrowid"),
)
dbname = testing.db.scalar("select db_name()")
owner = testing.db.scalar("SELECT user_name()")
+ referred_schema = '%(dbname)s.%(owner)s' % {
+ "dbname": dbname, "owner": owner}
inspector = inspect(testing.db)
bar_via_db = inspector.get_foreign_keys(
- "bar", schema="%s.%s" % (dbname, owner))
+ "bar", schema=referred_schema)
eq_(
bar_via_db,
[{
'referred_table': 'foo',
'referred_columns': ['id'],
- 'referred_schema': 'test.dbo',
+ 'referred_schema': referred_schema,
'name': 'fkfoo',
'constrained_columns': ['foo_id']}]
)
- assert testing.db.has_table("bar", schema="test.dbo")
+ assert testing.db.has_table("bar", schema=referred_schema)
m2 = MetaData()
- Table('bar', m2, schema="test.dbo", autoload=True,
- autoload_with=testing.db)
- eq_(m2.tables["test.dbo.foo"].schema, "test.dbo")
+ Table('bar', m2, schema=referred_schema, autoload=True,
+ autoload_with=testing.db)
+ eq_(m2.tables["%s.foo" % referred_schema].schema, referred_schema)
@testing.provide_metadata
def test_indexes_cols(self):
Date, Time, DateTime, DefaultClause, PickleType, text, Text, \
UnicodeText, LargeBinary
from sqlalchemy import types, schema
+from sqlalchemy import util
from sqlalchemy.databases import mssql
from sqlalchemy.dialects.mssql.base import TIME, _MSDate
from sqlalchemy.dialects.mssql.base import MS_2005_VERSION, MS_2008_VERSION
class MSDateTypeTest(fixtures.TestBase):
+ __only_on__ = 'mssql'
+ __backend__ = True
def test_result_processor(self):
expected = datetime.date(2000, 1, 2)
fixtures.TestBase, AssertsExecutionResults, ComparesTables):
__only_on__ = 'mssql'
+ __backend__ = True
+
@classmethod
def setup_class(cls):
global metadata
def teardown(self):
metadata.drop_all()
- @testing.fails_on_everything_except(
- 'mssql+pyodbc',
- 'mssql+mxodbc')
def test_decimal_notation(self):
numeric_table = Table(
'numeric_table', metadata,
engine.execute(tbl.delete())
-class MonkeyPatchedBinaryTest(fixtures.TestBase):
- __only_on__ = 'mssql+pymssql'
-
- def test_unicode(self):
- module = __import__('pymssql')
- result = module.Binary('foo')
- eq_(result, 'foo')
-
- def test_bytes(self):
- module = __import__('pymssql')
- input = b('\x80\x03]q\x00X\x03\x00\x00\x00oneq\x01a.')
- expected_result = input
- result = module.Binary(input)
- eq_(result, expected_result)
-
-
binary_table = None
MyPickleType = None
"""Test the Binary and VarBinary types"""
__only_on__ = 'mssql'
+ __requires__ = "non_broken_binary",
+ __backend__ = True
@classmethod
def setup_class(cls):
binary_table.create(engine)
return binary_table
+ def test_character_binary(self):
+ engine = testing.db
+ binary_table = self._fixture(engine)
+ with engine.connect() as conn:
+ conn.execute(
+ binary_table.insert(),
+ primary_id=1,
+ data=b("some normal data")
+ )
+
def test_binary_legacy_types(self):
self._test_binary(False)
# the type we used here is 100 bytes
# so we will get 100 bytes zero-padded
paddedstream = list(stream2[0:99])
- paddedstream.extend(['\x00'] * (100 - len(paddedstream)))
+ if util.py3k:
+ paddedstream.extend([0] * (100 - len(paddedstream)))
+ else:
+ paddedstream.extend(['\x00'] * (100 - len(paddedstream)))
eq_(
list(row['data_slice']), paddedstream
)
@testing.fails_on_everything_except(
'postgresql+psycopg2', 'postgresql+psycopg2cffi',
'postgresql+pypostgresql', 'postgresql+pygresql',
- 'mysql+mysqlconnector', 'mysql+pymysql', 'mysql+cymysql')
+ 'mysql+mysqlconnector', 'mysql+pymysql', 'mysql+cymysql',
+ 'mssql+pymssql')
def test_raw_python(self):
def go(conn):
conn.execute(
test_needs_autoincrement=True)
b_id = Column(Integer, ForeignKey('b.id'))
- @testing.fails_on("oracle",
- "seems like oracle's query engine can't "
- "handle this, not clear if there's an "
- "expression-level bug on our end though")
+ @testing.fails_on(["oracle", "mssql"],
+ "Oracle / SQL server engines can't handle this, "
+ "not clear if there's an expression-level bug on our "
+ "end though")
def test_join_w_eager_w_any(self):
A, B, C, D, E = (self.classes.A,
self.classes.B,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('b_id', Integer,
- ForeignKey('b.id', use_alter=True, name='b')))
+ ForeignKey('b.id', use_alter=True, name='b_fk')))
Table('b', metadata,
Column('id', Integer, ForeignKey('a.id'), primary_key=True))
select([func.count('*')]).select_from(
sess.query(Person).with_polymorphic('*')
.options(joinedload(Engineer.machines))
- .limit(2).offset(1).with_labels().subquery()
+ .order_by(Person.person_id).limit(2).offset(1)
+ .with_labels().subquery()
).scalar(), 2)
def test_get_one(self):
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
-
class EagerTest(fixtures.MappedTest):
run_deletes = None
run_inserts = "once"
@classmethod
def define_tables(cls, metadata):
- if testing.db.dialect.supports_native_boolean:
- false = 'false'
- else:
- false = "0"
-
- cls.other['false'] = false
-
Table('owners', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
primary_key=True),
Column('owner_id', Integer, ForeignKey('owners.id'),
primary_key=True),
- Column('someoption', sa.Boolean, server_default=false,
+ Column('someoption', sa.Boolean, server_default=sa.false(),
nullable=False))
@classmethod
@testing.crashes('sybase', 'FIXME: unknown, verify not fails_on')
def test_without_outerjoin_literal(self):
- Thing, tests, false = (self.classes.Thing,
- self.tables.tests,
- self.other.false)
+ Thing, tests= (self.classes.Thing,
+ self.tables.tests)
s = create_session()
q = s.query(Thing).options(sa.orm.joinedload('category'))
result = (q.filter(
(tests.c.owner_id == 1) &
text(
- 'options.someoption is null or options.someoption=%s' %
- false)).join('owner_option'))
+ 'options.someoption is null or options.someoption=:opt'
+ ).bindparams(opt=False)).join('owner_option'))
result_str = ["%d %s" % (t.id, t.category.name) for t in result]
eq_(result_str, ['3 Some Category'])
g2 = sess.query(Graph).get(Version(g.id, g.version_id))
eq_(g.version, g2.version)
- @testing.fails_on('mssql', 'Cannot update identity columns.')
def test_pk_mutation(self):
Graph, Version = self.classes.Graph, self.classes.Version
order_by(User.id, adalias.c.id)
def go():
- eq_(self.static.user_address_result, q.order_by(User.id).all())
+ eq_(self.static.user_address_result, q.all())
self.assert_sql_count(testing.db, go, 1)
sess.expunge_all()
self._assert_not_hasparent(a1)
+ @testing.requires.updateable_autoincrement_pks
@testing.requires.predictable_gc
def test_stale_state_positive_pk_change(self):
"""Illustrate that we can't easily link a
(Transition, [{'name': 'transition1'},
{'name': 'transition2'}])})
+ @testing.requires.updateable_autoincrement_pks
@testing.requires.sane_multi_rowcount
def test_stale_conditions(self):
Place, Transition, place_input, place, transition = (
User = self.classes.User
sess = create_session()
- q = sess.query(User)
+ q = sess.query(User).order_by(User.id)
self.assert_sql(
testing.db, lambda: q[10:20], [
(
"SELECT users.id AS users_id, users.name "
- "AS users_name FROM users LIMIT :param_1 OFFSET :param_2",
+ "AS users_name FROM users ORDER BY users.id "
+ "LIMIT :param_1 OFFSET :param_2",
{'param_1': 10, 'param_2': 10})])
self.assert_sql(
testing.db, lambda: q[:20], [
(
"SELECT users.id AS users_id, users.name "
- "AS users_name FROM users LIMIT :param_1",
+ "AS users_name FROM users ORDER BY users.id "
+ "LIMIT :param_1",
{'param_1': 20})])
self.assert_sql(
testing.db, lambda: q[5:], [
(
"SELECT users.id AS users_id, users.name "
- "AS users_name FROM users LIMIT -1 OFFSET :param_1",
+ "AS users_name FROM users ORDER BY users.id "
+ "LIMIT -1 OFFSET :param_1",
{'param_1': 5})])
self.assert_sql(testing.db, lambda: q[2:2], [])
testing.db, lambda: q[-5:-2], [
(
"SELECT users.id AS users_id, users.name AS users_name "
- "FROM users", {})])
+ "FROM users ORDER BY users.id", {})])
self.assert_sql(
testing.db, lambda: q[-5:], [
(
"SELECT users.id AS users_id, users.name AS users_name "
- "FROM users", {})])
+ "FROM users ORDER BY users.id", {})])
self.assert_sql(
testing.db, lambda: q[:], [
(
"SELECT users.id AS users_id, users.name AS users_name "
- "FROM users", {})])
+ "FROM users ORDER BY users.id", {})])
class FilterTest(QueryTest, AssertsCompiledSQL):
with self._assert_bind_args(session):
session.query(func.max(User.score)).scalar()
+
+ @testing.requires.nested_aggregates
def test_column_property_select(self):
User = self.classes.User
Address = self.classes.Address
from __future__ import with_statement
from sqlalchemy import (
testing, exc as sa_exc, event, String, Column, Table, select, func)
+from sqlalchemy.sql import elements
from sqlalchemy.testing import (
fixtures, engines, eq_, assert_raises, assert_raises_message,
assert_warnings, mock, expect_warnings, is_, is_not_)
from test.orm._fixtures import FixtureTest
from sqlalchemy import inspect
-
class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
run_inserts = None
__backend__ = True
def prevent_savepoint_rollback(
cursor, statement, parameters, context=None):
- if "rollback to savepoint" in statement.lower():
+ if context is not None and context.compiled and isinstance(
+ context.compiled.statement,
+ elements.RollbackToSavepointClause):
raise rollback_error
self.event_listen(
def prevent_savepoint_rollback(
cursor, statement, parameters, context=None):
- if "rollback to savepoint" in statement.lower():
+ if context is not None and context.compiled and isinstance(
+ context.compiled.statement,
+ elements.RollbackToSavepointClause):
raise rollback_error
self.event_listen(testing.db, "handle_error", canary, retval=True)
def teardown_class(cls):
super(UnicodeSchemaTest, cls).teardown_class()
- @testing.fails_on(
- 'mssql+pyodbc',
- 'pyodbc returns a non unicode encoding of the results description.')
def test_mapping(self):
t2, t1 = self.tables.t2, self.tables.t1
assert new_a1.t2s[0].d == b1.d
session.expunge_all()
- @testing.fails_on(
- 'mssql+pyodbc',
- 'pyodbc returns a non unicode encoding of the results description.')
def test_inheritance_mapping(self):
t2, t1 = self.tables.t2, self.tables.t1
class Foo(cls.Basic):
pass
+ @testing.requires.non_broken_binary
def test_binary_equality(self):
Foo, t1 = self.classes.Foo, self.tables.t1
- data = b("this is some data")
+ #data = b("this is some data")
+ data = b'm\x18' #m\xf2\r\n\x7f\x10'
mapper(Foo, t1)
class BatchDeleteIgnoresRowcountTest(fixtures.DeclarativeMappedTest):
- __requires__ = ('foreign_keys',)
+ __requires__ = ('foreign_keys', 'recursive_fk_cascade')
@classmethod
def setup_classes(cls):
sess.flush
)
- @testing.requires.sane_multi_rowcount
+ @testing.requires.sane_rowcount
def test_delete_twice(self):
Parent, Child = self._fixture()
sess = Session()
class ColumnTypeTest(fixtures.MappedTest):
__backend__ = True
+ __requires__ = 'sane_rowcount',
@classmethod
def define_tables(cls, metadata):
class AlternateGeneratorTest(fixtures.MappedTest):
__backend__ = True
+ __requires__ = 'sane_rowcount',
@classmethod
def define_tables(cls, metadata):
class ManualInheritanceVersionTest(fixtures.MappedTest):
run_define_tables = 'each'
__backend__ = True
+ __requires__ = 'sane_rowcount',
@classmethod
def define_tables(cls, metadata):
def mysql_not_mariadb_102(config):
return against(config, "mysql") and (
not config.db.dialect._is_mariadb or
- config.db.dialect.server_version_info < (5, 5, 5, 10, 2)
+ config.db.dialect._mariadb_normalized_version_info < (10, 2)
)
return self.check_constraints + fails_on(
return fails_on_everything_except('sqlite', 'oracle', '+zxjdbc') + \
skip_if('mssql')
+ @property
+ def recursive_fk_cascade(self):
+ """target database must support ON DELETE CASCADE on a self-referential
+ foreign key"""
+
+ return skip_if(["mssql"])
+
@property
def deferrable_fks(self):
"""target database must support deferrable fks"""
["firebird"], "not supported"
)
+ @property
+ def non_broken_binary(self):
+ """target DBAPI must work fully with binary values"""
+
+ # see https://github.com/pymssql/pymssql/issues/504
+ return skip_if(["mssql+pymssql"])
+
@property
def binary_comparisons(self):
"""target database/driver can allow BLOB/BINARY fields to be compared
return skip_if(
[
- "mssql+pyodbc",
- "mssql+mxodbc",
- "mysql+mysqldb",
- "mysql+pymysql"], "no driver support"
+ "mssql",
+ "mysql"], "no driver support"
)
@property
"SQL Server 2005+ is required for "
"independent connections")])
+ @property
+ def memory_process_intensive(self):
+ """Driver is able to handle the memory tests which run in a subprocess
+ and iterate through hundreds of connections
+
+ """
+ return skip_if([
+ no_support("oracle", "Oracle XE usually can't handle these"),
+ no_support("mssql+pyodbc", "MS ODBC drivers struggle")
+ ])
+
@property
def updateable_autoincrement_pks(self):
"""Target must support UPDATE on autoincrement/integer primary key."""
@property
def savepoints_w_release(self):
return self.savepoints + skip_if(
- "oracle", "oracle doesn't support release of savepoint")
+ ["oracle", "mssql"],
+ "database doesn't support release of savepoint"
+ )
+
@property
def schemas(self):
['postgresql', 'mssql']
)
+ @property
+ def ctes_on_dml(self):
+ """target database supports CTES which consist of INSERT, UPDATE
+ or DELETE"""
+
+ return only_if(
+ ['postgresql']
+ )
+
@property
def mod_operator_as_percent_sign(self):
"""target database must use a plain percent '%' as the 'modulus'
"firebird", "mysql", "sybase",
], 'no support for EXCEPT')
+ @property
+ def order_by_col_from_union(self):
+ """target database supports ordering by a column from a SELECT
+ inside of a UNION
+
+ E.g. (SELECT id, ...) UNION (SELECT id, ...) ORDER BY id
+
+ Fails on SQL Server
+
+ """
+ return fails_if('mssql')
+
@property
def parens_in_union_contained_select_w_limit_offset(self):
"""Target database must support parenthesized SELECT in UNION
when LIMIT/OFFSET is specifically present.
- E.g. (SELECT ...) UNION (SELECT ..)
+ E.g. (SELECT ... LIMIT ..) UNION (SELECT .. OFFSET ..)
This is known to fail on SQLite.
"""Target database must support parenthesized SELECT in UNION
when OFFSET/LIMIT is specifically not present.
- E.g. (SELECT ... LIMIT ..) UNION (SELECT .. OFFSET ..)
+ E.g. (SELECT ...) UNION (SELECT ..)
This is known to fail on SQLite. It also fails on Oracle
because without LIMIT/OFFSET, there is currently no step that
util.py2k,
"bug in mysqlconnector 2.0"
),
- LambdaPredicate(
- lambda config: against(config, 'mssql+pyodbc') and
- config.db.dialect.freetds and
- config.db.dialect.freetds_driver_version < "0.91",
- "older freetds doesn't support unicode DDL"
- ),
exclude('mysql', '<', (4, 1, 1), 'no unicode connection support'),
])
return fails_on_everything_except('postgresql', 'oracle', 'mssql',
'sybase', 'sqlite')
+ @property
+ def nested_aggregates(self):
+ """target database can select an aggregate from a subquery that's
+ also using an aggregate"""
+
+ return skip_if(["mssql"])
+
@property
def array_type(self):
return only_on([
('sqlite', None, None, 'TODO'),
("firebird", None, None, "Precision must be from 1 to 18"),
("sybase+pysybase", None, None, "TODO"),
- ('mssql+pymssql', None, None,
- 'FIXME: improve pymssql dec handling')]
+ ]
)
@property
@property
def mssql_freetds(self):
- return only_on(
- LambdaPredicate(
- lambda config: (
- (against(config, 'mssql+pyodbc') and
- config.db.dialect.freetds)
- or against(config, 'mssql+pymssql')
- )
- )
- )
+ return only_on(["mssql+pymssql"])
@property
def ad_hoc_engines(self):
expected
)
+ @testing.requires.ctes_on_dml
def test_update_in_select(self):
self._test_a_in_b("update", "select")
+ @testing.requires.ctes_on_dml
def test_delete_in_select(self):
self._test_a_in_b("update", "select")
+ @testing.requires.ctes_on_dml
def test_insert_in_select(self):
self._test_a_in_b("update", "select")
{'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"}
)
+ @testing.requires.sequences
def test_lastrow_accessor_four(self):
metadata = MetaData()
self._test_lastrow_accessor(
@testing.fails_on('firebird', "uses sql-92 rules")
@testing.fails_on('sybase', "uses sql-92 rules")
- @testing.fails_if(
- lambda: testing.against('mssql+pyodbc') and not
- testing.db.dialect.freetds, "uses sql-92 rules")
+ @testing.skip_if(['mssql'])
def test_bind_in(self):
"""test calling IN against a bind parameter.
eq_(result.fetchone(), None)
assert connection.closed
+ @testing.requires.updateable_autoincrement_pks
def test_connectionless_autoclose_no_metadata(self):
result = testing.db.execute("update users set user_id=5")
connection = result.connection
def get_col_spec(self):
return "BAR"
+ t = Table('t', MetaData(), Column('bar', MyType, nullable=False))
+
self.assert_compile(
- ddl.CreateColumn(Column('bar', MyType)),
- "bar FOOB bar"
+ ddl.CreateColumn(t.c.bar),
+ "bar FOOB bar NOT NULL"
)
+
+ t = Table('t', MetaData(),
+ Column('bar', MyOtherType, nullable=False))
self.assert_compile(
- ddl.CreateColumn(Column('bar', MyOtherType)),
- "bar BAR"
+ ddl.CreateColumn(t.c.bar),
+ "bar BAR NOT NULL"
)
def test_typedecorator_literal_render_fallback_bound(self):
Table(
'non_native_enum_table', metadata,
- Column("id", Integer, primary_key=True),
+ Column("id", Integer, primary_key=True, autoincrement=False),
Column('someenum', Enum('one', 'two', 'three', native_enum=False)),
Column('someotherenum',
Enum('one', 'two', 'three',
@testing.requires.enforces_check_constraints
def test_check_constraint(self):
assert_raises(
- (exc.IntegrityError, exc.ProgrammingError),
+ (exc.IntegrityError, exc.ProgrammingError, exc.OperationalError),
testing.db.execute,
"insert into non_native_enum_table "
"(id, someenum) values(1, 'four')")
def teardown_class(cls):
metadata.drop_all()
+ @testing.requires.non_broken_binary
def test_round_trip(self):
testobj1 = pickleable.Foo('im foo 1')
testobj2 = pickleable.Foo('im foo 2')
return "FOOB %s" % kw['type_expression'].name
m = MetaData()
- t = Table('t', m, Column('bar', MyType))
+ t = Table('t', m, Column('bar', MyType, nullable=False))
self.assert_compile(
ddl.CreateColumn(t.c.bar),
- "bar FOOB bar"
+ "bar FOOB bar NOT NULL"
)
sqlite: SQLITE={env:TOX_SQLITE:--db sqlite}
postgresql: POSTGRESQL={env:TOX_POSTGRESQL:--db postgresql}
mysql: MYSQL={env:TOX_MYSQL:--db mysql --db pymysql}
- oracle: ORACLE={env:TOX_ORACLE:--db oracle} --write-idents oracle_idents.txt --nomemory
- mssql: MSSQL={env:TOX_MSSQL:--db pyodbc --db pymssql}
+ oracle: ORACLE={env:TOX_ORACLE:--db oracle}
+ mssql: MSSQL={env:TOX_MSSQL:--db mssql --db mssql_pymssql}
+ oracle,mssql: IDENTS=--write-idents db_idents.txt
+ oracle,mssql: NOMEMORY=--nomemory
backendonly: BACKENDONLY=--backend-only
# tox as of 2.0 blocks all environment variables from the
# for nocext, we rm *.so in lib in case we are doing usedevelop=True
commands=
- {nocext}: sh -c "rm -f lib/sqlalchemy/*.so"
- {env:BASECOMMAND} {env:WORKERS} {env:SQLITE:} {env:POSTGRESQL:} {env:MYSQL:} {env:ORACLE:} {env:MSSQL:} {env:BACKENDONLY:} {env:COVERAGE:} {posargs}
- {oracle}: python reap_oracle_dbs.py oracle_idents.txt
-
+ nocext: sh -c "rm -f lib/sqlalchemy/*.so"
+ {env:BASECOMMAND} {env:WORKERS} {env:SQLITE:} {env:POSTGRESQL:} {env:MYSQL:} {env:ORACLE:} {env:MSSQL:} {env:BACKENDONLY:} {env:IDENTS:} {env:NOMEMORY:} {env:COVERAGE:} {posargs}
+ oracle,mssql: python reap_dbs.py db_idents.txt
[testenv:pep8]
deps=flake8