--- /dev/null
+.. change::
+ :tags: bug, mssql
+ :tickets: 9414
+
+ Fixed issue in the new :class:`.Uuid` datatype which prevented it from
+ working with the pymssql driver. As pymssql seems to be maintained again,
+ restored testing support for pymssql.
+
+.. change::
+ :tags: bug, mssql
+
+ Tweaked the pymssql dialect to take better advantage of
+ RETURNING for INSERT statements in order to retrieve last inserted primary
+ key values, in the same way as occurs for the mssql+pyodbc dialect right
+ now.
+
+.. change::
+ :tags: bug, orm
+
+ Identified that the ``sqlite`` and ``mssql+pyodbc`` dialects are now
+ compatible with the SQLAlchemy ORM's "versioned rows" feature, since
+ SQLAlchemy now computes rowcount for a RETURNING statement in this specific
+ case by counting the rows returned, rather than relying upon
+ ``cursor.rowcount``. In particular, the ORM versioned rows use case
+ (documented at :ref:`mapper_version_counter`) should now be fully
+ supported with the SQL Server pyodbc dialect.
+
of rows updated from an UPDATE or DELETE statement.
As of this writing, the PyODBC driver is not able to return a rowcount when
-OUTPUT INSERTED is used. This impacts the SQLAlchemy ORM's versioning feature
-in many cases where server-side value generators are in use in that while the
-versioning operations can succeed, the ORM cannot always check that an UPDATE
-or DELETE statement matched the number of rows expected, which is how it
-verifies that the version identifier matched. When this condition occurs, a
-warning will be emitted but the operation will proceed.
-
-The use of OUTPUT INSERTED can be disabled by setting the
-:paramref:`_schema.Table.implicit_returning` flag to ``False`` on a particular
-:class:`_schema.Table`, which in declarative looks like::
-
- class MyTable(Base):
- __tablename__ = 'mytable'
- id = Column(Integer, primary_key=True)
- stuff = Column(String(10))
- timestamp = Column(TIMESTAMP(), default=text('DEFAULT'))
- __mapper_args__ = {
- 'version_id_col': timestamp,
- 'version_id_generator': False,
- }
- __table_args__ = {
- 'implicit_returning': False
- }
+OUTPUT INSERTED is used. Previous versions of SQLAlchemy therefore had
+limitations for features such as the "ORM Versioning" feature that relies upon
+accurate rowcounts in order to match version numbers with matched rows.
+
+SQLAlchemy 2.0 now retrieves the "rowcount" manually for these particular use
+cases based on counting the rows that arrived back within RETURNING; so while
+the driver still has this limitation, the ORM Versioning feature is no longer
+impacted by it. As of SQLAlchemy 2.0.5, ORM versioning has been fully
+re-enabled for the pyodbc driver.
+
+.. versionchanged:: 2.0.5 ORM versioning support is restored for the pyodbc
+ driver. Previously, a warning would be emitted during ORM flush that
+ versioning was not supported.
+
Enabling Snapshot Isolation
---------------------------
supports_statement_cache = True
supports_default_values = True
supports_empty_insert = False
+ favor_returning_over_lastrowid = True
supports_comments = True
supports_default_metavalue = False
from ...testing.provision import create_db
from ...testing.provision import drop_all_schema_objects_pre_tables
from ...testing.provision import drop_db
+from ...testing.provision import generate_driver_url
from ...testing.provision import get_temp_table_name
from ...testing.provision import log
from ...testing.provision import normalize_sequence
from ...testing.provision import temp_table_keyword_args
+@generate_driver_url.for_db("mssql")
+def generate_driver_url(url, driver, query_str):
+
+ backend = url.get_backend_name()
+
+ new_url = url.set(drivername="%s+%s" % (backend, driver))
+
+ if driver != "pyodbc":
+ new_url = new_url.set(query="")
+
+ if query_str:
+ new_url = new_url.update_query_string(query_str)
+
+ try:
+ new_url.get_dialect()
+ except exc.NoSuchModuleError:
+ return None
+ else:
+ return new_url
+
+
@create_db.for_db("mssql")
def _mssql_create_db(cfg, eng, ident):
with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
pymssql is a Python module that provides a Python DBAPI interface around
`FreeTDS <https://www.freetds.org/>`_.
-.. note::
+.. versionchanged:: 2.0.5
- pymssql is currently not included in SQLAlchemy's continuous integration
- (CI) testing.
+ pymssql was restored to SQLAlchemy's continuous integration testing
""" # noqa
class MSDialect_pymssql(MSDialect):
supports_statement_cache = True
supports_native_decimal = True
+ supports_native_uuid = True
driver = "pymssql"
preparer = MSIdentifierPreparer_pymssql
Rowcount Support
----------------
-Pyodbc only has partial support for rowcount. See the notes at
-:ref:`mssql_rowcount_versioning` for important notes when using ORM
-versioning.
+Previous limitations with the SQLAlchemy ORM's "versioned rows" feature with
+Pyodbc have been resolved as of SQLAlchemy 2.0.5. See the notes at
+:ref:`mssql_rowcount_versioning`.
.. _mssql_pyodbc_fastexecutemany:
class MSDialect_pyodbc(PyODBCConnector, MSDialect):
supports_statement_cache = True
- # mssql still has problems with this on Linux
+ # note this parameter is no longer used by the ORM or default dialect
+ # see #9414
supports_sane_rowcount_returning = False
- favor_returning_over_lastrowid = True
-
execution_ctx_cls = MSExecutionContext_pyodbc
colspecs = util.update_copy(
supports_default_values = True
supports_default_metavalue = False
+ # sqlite issue:
# https://github.com/python/cpython/issues/93421
+ # note this parameter is no longer used by the ORM or default dialect
+ # see #9414
supports_sane_rowcount_returning = False
supports_empty_insert = False
statement = statement.return_defaults(mapper.version_id_col)
return_defaults = True
- assert_singlerow = (
- connection.dialect.supports_sane_rowcount
- if not return_defaults
- else connection.dialect.supports_sane_rowcount_returning
- )
+ assert_singlerow = connection.dialect.supports_sane_rowcount
assert_multirow = (
assert_singlerow
if mapper._version_id_has_server_side_value:
statement = statement.return_defaults(mapper.version_id_col)
- return_defaults = True
- else:
- return_defaults = False
# execute each UPDATE in the order according to the original
# list of states to guarantee row access order, but
records = list(records)
connection = key[0]
- assert_singlerow = (
- connection.dialect.supports_sane_rowcount
- if not return_defaults
- else connection.dialect.supports_sane_rowcount_returning
- )
+ assert_singlerow = connection.dialect.supports_sane_rowcount
assert_multirow = (
assert_singlerow
and connection.dialect.supports_sane_multi_rowcount
"""
return exclusions.open()
+ @property
+ def arraysize(self):
+ """dialect includes the required pep-249 attribute
+ ``cursor.arraysize``"""
+
+ return exclusions.open()
+
@property
def emulated_lastrowid(self):
"""target dialect retrieves cursor.lastrowid, or fetches
return exclusions.closed()
+ @property
+ def date_implicit_bound(self):
+ """target dialect when given a date object will bind it such
+ that the database server knows the object is a date, and not
+ a plain string.
+
+ """
+ return exclusions.open()
+
+ @property
+ def time_implicit_bound(self):
+ """target dialect when given a time object will bind it such
+ that the database server knows the object is a time, and not
+ a plain string.
+
+ """
+ return exclusions.open()
+
@property
def datetime_implicit_bound(self):
"""target dialect when given a datetime object will bind it such
from sqlalchemy import bindparam
from sqlalchemy import Column
from sqlalchemy import Integer
+from sqlalchemy import MetaData
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Table
)
eq_(r.rowcount, 3)
- @testing.requires.update_returning
- def test_update_rowcount_return_defaults(self, connection):
+ @testing.variation("implicit_returning", [True, False])
+ @testing.variation(
+ "dml",
+ [
+ ("update", testing.requires.update_returning),
+ ("delete", testing.requires.delete_returning),
+ ],
+ )
+ def test_update_delete_rowcount_return_defaults(
+ self, connection, implicit_returning, dml
+ ):
"""note this test should succeed for all RETURNING backends
as of 2.0. In
Idf28379f8705e403a3c6a937f6a798a042ef2540 we changed rowcount to use
len(rows) when we have implicit returning
"""
- employees_table = self.tables.employees
+
+ if implicit_returning:
+ employees_table = self.tables.employees
+ else:
+ employees_table = Table(
+ "employees",
+ MetaData(),
+ Column(
+ "employee_id",
+ Integer,
+ autoincrement=False,
+ primary_key=True,
+ ),
+ Column("name", String(50)),
+ Column("department", String(1)),
+ implicit_returning=False,
+ )
department = employees_table.c.department
- stmt = (
- employees_table.update()
- .where(department == "C")
- .values(name=employees_table.c.department + "Z")
- .return_defaults()
- )
+
+ if dml.update:
+ stmt = (
+ employees_table.update()
+ .where(department == "C")
+ .values(name=employees_table.c.department + "Z")
+ .return_defaults()
+ )
+ elif dml.delete:
+ stmt = (
+ employees_table.delete()
+ .where(department == "C")
+ .return_defaults()
+ )
+ else:
+ dml.fail()
r = connection.execute(stmt)
eq_(r.rowcount, 3)
Column("decorated_date_data", Decorated),
)
- @testing.requires.datetime_implicit_bound
- def test_select_direct(self, connection):
- result = connection.scalar(select(literal(self.data)))
- eq_(result, self.data)
-
def test_round_trip(self, connection):
date_table = self.tables.date_table
datatype = DateTime
data = datetime.datetime(2012, 10, 15, 12, 57, 18)
+ @testing.requires.datetime_implicit_bound
+ def test_select_direct(self, connection):
+ result = connection.scalar(select(literal(self.data)))
+ eq_(result, self.data)
+
class DateTimeTZTest(_DateFixture, fixtures.TablesTest):
__requires__ = ("datetime_timezone",)
2012, 10, 15, 12, 57, 18, tzinfo=datetime.timezone.utc
)
+ @testing.requires.datetime_implicit_bound
+ def test_select_direct(self, connection):
+ result = connection.scalar(select(literal(self.data)))
+ eq_(result, self.data)
+
class DateTimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
__requires__ = ("datetime_microseconds",)
datatype = Time
data = datetime.time(12, 57, 18)
+ @testing.requires.time_implicit_bound
+ def test_select_direct(self, connection):
+ result = connection.scalar(select(literal(self.data)))
+ eq_(result, self.data)
+
class TimeTZTest(_DateFixture, fixtures.TablesTest):
__requires__ = ("time_timezone",)
datatype = Time(timezone=True)
data = datetime.time(12, 57, 18, tzinfo=datetime.timezone.utc)
+ @testing.requires.time_implicit_bound
+ def test_select_direct(self, connection):
+ result = connection.scalar(select(literal(self.data)))
+ eq_(result, self.data)
+
class TimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
__requires__ = ("time_microseconds",)
datatype = Time
data = datetime.time(12, 57, 18, 396)
+ @testing.requires.time_implicit_bound
+ def test_select_direct(self, connection):
+ result = connection.scalar(select(literal(self.data)))
+ eq_(result, self.data)
+
class DateTest(_DateFixture, fixtures.TablesTest):
__requires__ = ("date",)
datatype = Date
data = datetime.date(2012, 10, 15)
+ @testing.requires.date_implicit_bound
+ def test_select_direct(self, connection):
+ result = connection.scalar(select(literal(self.data)))
+ eq_(result, self.data)
+
class DateTimeCoercedToDateTimeTest(_DateFixture, fixtures.TablesTest):
"""this particular suite is testing that datetime parameters get
data = datetime.datetime(2012, 10, 15, 12, 57, 18)
compare = datetime.date(2012, 10, 15)
+ @testing.requires.datetime_implicit_bound
+ def test_select_direct(self, connection):
+ result = connection.scalar(select(literal(self.data)))
+ eq_(result, self.data)
+
class DateTimeHistoricTest(_DateFixture, fixtures.TablesTest):
__requires__ = ("datetime_historic",)
datatype = DateTime
data = datetime.datetime(1850, 11, 10, 11, 52, 35)
+ @testing.requires.date_implicit_bound
+ def test_select_direct(self, connection):
+ result = connection.scalar(select(literal(self.data)))
+ eq_(result, self.data)
+
class DateHistoricTest(_DateFixture, fixtures.TablesTest):
__requires__ = ("date_historic",)
datatype = Date
data = datetime.date(1727, 4, 1)
+ @testing.requires.date_implicit_bound
+ def test_select_direct(self, connection):
+ result = connection.scalar(select(literal(self.data)))
+ eq_(result, self.data)
+
class IntegerTest(_LiteralRoundTripFixture, fixtures.TestBase):
__backend__ = True
mariadb = mariadb+mysqldb://scott:tiger@127.0.0.1:3306/test
mariadb_connector = mariadb+mariadbconnector://scott:tiger@127.0.0.1:3306/test
mssql = mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+13+for+SQL+Server
-mssql_pymssql = mssql+pymssql://scott:tiger@ms_2008
+pymssql = mssql+pymssql://scott:tiger^5HHH@mssql2017:1433/test
docker_mssql = mssql+pyodbc://scott:tiger^5HHH@127.0.0.1:1433/test?driver=ODBC+Driver+17+for+SQL+Server
oracle = oracle+cx_oracle://scott:tiger@oracle18c/xe
cxoracle = oracle+cx_oracle://scott:tiger@oracle18c/xe
from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy.dialects.mssql import base as mssql
+from sqlalchemy.dialects.mssql import pyodbc as mssql_pyodbc
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import config
from sqlalchemy.testing import engines
return result.scalar() == 0
-class MatchTest(fixtures.TablesTest, AssertsCompiledSQL):
+class MatchTest(AssertsCompiledSQL, fixtures.TablesTest):
__only_on__ = "mssql"
__skip_if__ = (full_text_search_missing,)
self.assert_compile(
matchtable.c.title.match("somstr"),
"CONTAINS (matchtable.title, ?)",
+ dialect=mssql_pyodbc.dialect(paramstyle="qmark"),
)
def test_simple_match(self, connection):
)
return t, (d1, t1, d2, d3)
- def test_date_roundtrips(self, date_fixture, connection):
+ def test_date_roundtrips_no_offset(self, date_fixture, connection):
+ t, (d1, t1, d2, d3) = date_fixture
+ connection.execute(
+ t.insert(),
+ dict(
+ adate=d1,
+ adatetime=d2,
+ atime1=t1,
+ atime2=d2,
+ ),
+ )
+
+ row = connection.execute(t.select()).first()
+ eq_(
+ (
+ row.adate,
+ row.adatetime,
+ row.atime1,
+ row.atime2,
+ ),
+ (
+ d1,
+ d2,
+ t1,
+ d2.time(),
+ ),
+ )
+
+ @testing.skip_if("+pymssql", "offsets dont seem to work")
+ def test_date_roundtrips_w_offset(self, date_fixture, connection):
t, (d1, t1, d2, d3) = date_fixture
connection.execute(
t.insert(),
(datetime.datetime(2007, 10, 30, 11, 2, 32)),
argnames="date",
)
+ @testing.skip_if("+pymssql", "unknown failures")
def test_tz_present_or_non_in_dates(self, date_fixture, connection, date):
t, (d1, t1, d2, d3) = date_fixture
connection.execute(
id_="iaaa",
argnames="dto_param_value, expected_offset_hours, should_fail",
)
+ @testing.skip_if("+pymssql", "offsets dont seem to work")
def test_datetime_offset(
self,
datetimeoffset_fixture,
update=False, delete=False, only_returning=False
):
warnings = ()
- if (
- only_returning
- and not testing.db.dialect.supports_sane_rowcount_returning
- ) or (
- not only_returning and not testing.db.dialect.supports_sane_rowcount
- ):
+ if not testing.db.dialect.supports_sane_rowcount:
if update:
warnings += (
"Dialect .* does not support "
eq_(f1.version_id, 2)
- @testing.requires.sane_rowcount_w_returning
@testing.requires.updateable_autoincrement_pks
@testing.requires.update_returning
def test_sql_expr_w_mods_bump(self):
self.assert_sql_execution(testing.db, sess.flush, *statements)
@testing.requires.independent_connections
- @testing.requires.sane_rowcount_w_returning
def test_concurrent_mod_err_expire_on_commit(self):
sess = self._fixture()
)
@testing.requires.independent_connections
- @testing.requires.sane_rowcount_w_returning
def test_concurrent_mod_err_noexpire_on_commit(self):
sess = self._fixture(expire_on_commit=False)
[no_support("oracle", "ORA-03001: unimplemented feature")]
)
+ @property
+ def arraysize(self):
+ return skip_if("+pymssql", "DBAPI is missing this attribute")
+
@property
def emulated_lastrowid(self):
""" "target dialect retrieves cursor.lastrowid or an equivalent
return exclusions.open()
+ @property
+ def date_implicit_bound(self):
+ """target dialect when given a date object will bind it such
+ that the database server knows the object is a date, and not
+ a plain string.
+
+ """
+
+ # mariadbconnector works. pyodbc we dont know, not supported in
+ # testing.
+ return exclusions.fails_on(
+ [
+ "+mysqldb",
+ "+pymysql",
+ "+asyncmy",
+ "+mysqlconnector",
+ "+cymysql",
+ "+aiomysql",
+ ]
+ )
+
+ @property
+ def time_implicit_bound(self):
+ """target dialect when given a time object will bind it such
+ that the database server knows the object is a time, and not
+ a plain string.
+
+ """
+
+ # mariadbconnector works. pyodbc we dont know, not supported in
+ # testing.
+ return exclusions.fails_on(
+ [
+ "+mysqldb",
+ "+pymysql",
+ "+asyncmy",
+ "+mysqlconnector",
+ "+cymysql",
+ "+aiomysql",
+ ]
+ )
+
@property
def datetime_implicit_bound(self):
"""target dialect when given a datetime object will bind it such
- that the database server knows the object is a datetime, and not
+ that the database server knows the object is a date, and not
a plain string.
"""
"+mysqlconnector",
"+cymysql",
"+aiomysql",
+ "+pymssql",
]
)
metadata,
Column(
"date_id",
- DateTime(timezone=True),
+ # we want no tzinfo normally since pymssql doesn't do
+ # it right now
+ DateTime().with_variant(
+ DateTime(timezone=True), "postgresql"
+ ),
default=text("current_timestamp"),
primary_key=True,
),
rows.append(row)
eq_(len(rows), 2)
+ @testing.requires.arraysize
def test_fetchmany_arraysize_default(self, connection):
users = self.tables.users
eq_(len(rows), min(arraysize, 150))
+ @testing.requires.arraysize
def test_fetchmany_arraysize_set(self, connection):
users = self.tables.users
oracle: oracle
oracle: oracle_oracledb
mssql: mssql
+ mssql: mssql_pymssql
deps=
pytest>=7.0.0rc1,<8
# py3{,7,8,9,10,11}-mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector --dbdriver asyncmy}
py3{,7,8,9,10,11}-mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver asyncmy}
-
mssql: MSSQL={env:TOX_MSSQL:--db mssql}
+ py3{,7,8,9,10,11}-mssql: EXTRA_MSSQL_DRIVERS={env:EXTRA_MSSQL_DRIVERS:--dbdriver pyodbc --dbdriver pymssql}
oracle,mssql,sqlite_file: IDENTS=--write-idents db_idents.txt
# that flag for coverage mode.
nocext: sh -c "rm -f lib/sqlalchemy/*.so"
- {env:BASECOMMAND} {env:WORKERS} {env:SQLITE:} {env:EXTRA_SQLITE_DRIVERS:} {env:POSTGRESQL:} {env:EXTRA_PG_DRIVERS:} {env:MYSQL:} {env:EXTRA_MYSQL_DRIVERS:} {env:ORACLE:} {env:EXTRA_ORACLE_DRIVERS:} {env:MSSQL:} {env:IDENTS:} {env:PYTEST_EXCLUDES:} {env:COVERAGE:} {posargs}
+ {env:BASECOMMAND} {env:WORKERS} {env:SQLITE:} {env:EXTRA_SQLITE_DRIVERS:} {env:POSTGRESQL:} {env:EXTRA_PG_DRIVERS:} {env:MYSQL:} {env:EXTRA_MYSQL_DRIVERS:} {env:ORACLE:} {env:EXTRA_ORACLE_DRIVERS:} {env:MSSQL:} {env:EXTRA_MSSQL_DRIVERS:} {env:IDENTS:} {env:PYTEST_EXCLUDES:} {env:COVERAGE:} {posargs}
oracle,mssql,sqlite_file: python reap_dbs.py db_idents.txt