--- /dev/null
+.. change::
+ :tags: bug, mssql
+ :tickets: 4983
+
+ Repaired support for the :class:`.mssql.DATETIMEOFFSET` datatype on PyODBC,
+ by adding PyODBC-level result handlers as it does not include native
+ support for this datatype. This includes usage of the Python 3 "timezone"
+ tzinfo subclass in order to set up a timezone, which on Python 2 makes
+ use of a minimal backport of "timezone" in sqlalchemy.util.
+
""" # noqa
+import datetime
import decimal
import re
+import struct
from .base import BINARY
+from .base import DATETIMEOFFSET
from .base import MSDialect
from .base import MSExecutionContext
from .base import VARBINARY
return process
+class _ODBCDateTimeOffset(DATETIMEOFFSET):
+ def bind_processor(self, dialect):
+ def process(value):
+ """Convert to string format required by T-SQL."""
+ dto_string = value.strftime("%Y-%m-%d %H:%M:%S %z")
+ # offset needs a colon, e.g., -0700 -> -07:00
+ return dto_string[:23] + ":" + dto_string[23:]
+
+ return process
+
+
class _VARBINARY_pyodbc(_ms_binary_pyodbc, VARBINARY):
pass
sqltypes.Numeric: _MSNumeric_pyodbc,
sqltypes.Float: _MSFloat_pyodbc,
BINARY: _BINARY_pyodbc,
+ DATETIMEOFFSET: _ODBCDateTimeOffset,
# SQL Server dialect has a VARBINARY that is just to support
# "deprecate_large_types" w/ VARBINARY(max), but also we must
# handle the usual SQL standard VARBINARY
pass
return tuple(version)
+ def on_connect(self):
+ super_ = super(MSDialect_pyodbc, self).on_connect()
+
+ def on_connect(conn):
+ if super_ is not None:
+ super_(conn)
+
+ self._setup_timestampoffset_type(conn)
+
+ return on_connect
+
+ def _setup_timestampoffset_type(self, connection):
+ # output converter function for datetimeoffset
+ def _handle_datetimeoffset(dto_value):
+ tup = struct.unpack("<6hI2h", dto_value)
+ return datetime.datetime(
+ tup[0],
+ tup[1],
+ tup[2],
+ tup[3],
+ tup[4],
+ tup[5],
+ tup[6] // 1000,
+ util.timezone(
+ datetime.timedelta(hours=tup[7], minutes=tup[8])
+ ),
+ )
+
+ odbc_SQL_SS_TIMESTAMPOFFSET = -155 # as defined in SQLNCLI.h
+ connection.add_output_converter(
+ odbc_SQL_SS_TIMESTAMPOFFSET, _handle_datetimeoffset
+ )
+
def do_executemany(self, cursor, statement, parameters, context=None):
if self.fast_executemany:
cursor.fast_executemany = True
from .compat import StringIO # noqa
from .compat import text_type # noqa
from .compat import threading # noqa
+from .compat import timezone # noqa
from .compat import u # noqa
from .compat import ue # noqa
from .compat import unquote # noqa
return meta(name, bases, d)
return metaclass("temporary_class", None, {})
+
+
+if py3k:
+ from datetime import timezone
+else:
+ from datetime import datetime
+ from datetime import timedelta
+ from datetime import tzinfo
+
+ class timezone(tzinfo):
+ """Minimal port of python 3 timezone object"""
+
+ __slots__ = "_offset"
+
+ def __init__(self, offset):
+ if not isinstance(offset, timedelta):
+ raise TypeError("offset must be a timedelta")
+ if not self._minoffset <= offset <= self._maxoffset:
+ raise ValueError(
+ "offset must be a timedelta "
+ "strictly between -timedelta(hours=24) and "
+ "timedelta(hours=24)."
+ )
+ self._offset = offset
+
+ def __eq__(self, other):
+ if type(other) != timezone:
+ return False
+ return self._offset == other._offset
+
+ def __hash__(self):
+ return hash(self._offset)
+
+ def __repr__(self):
+ return "sqlalchemy.util.%s(%r)" % (
+ self.__class__.__name__,
+ self._offset,
+ )
+
+ def __str__(self):
+ return self.tzname(None)
+
+ def utcoffset(self, dt):
+ return self._offset
+
+ def tzname(self, dt):
+ return self._name_from_offset(self._offset)
+
+ def dst(self, dt):
+ return None
+
+ def fromutc(self, dt):
+ if isinstance(dt, datetime):
+ if dt.tzinfo is not self:
+ raise ValueError("fromutc: dt.tzinfo " "is not self")
+ return dt + self._offset
+ raise TypeError(
+ "fromutc() argument must be a datetime instance" " or None"
+ )
+
+ @staticmethod
+ def _timedelta_to_microseconds(timedelta):
+ """backport of timedelta._to_microseconds()"""
+ return (
+ timedelta.days * (24 * 3600) + timedelta.seconds
+ ) * 1000000 + timedelta.microseconds
+
+ @staticmethod
+ def _divmod_timedeltas(a, b):
+ """backport of timedelta.__divmod__"""
+
+ q, r = divmod(
+ timezone._timedelta_to_microseconds(a),
+ timezone._timedelta_to_microseconds(b),
+ )
+ return q, timedelta(0, 0, r)
+
+ @staticmethod
+ def _name_from_offset(delta):
+ if not delta:
+ return "UTC"
+ if delta < timedelta(0):
+ sign = "-"
+ delta = -delta
+ else:
+ sign = "+"
+ hours, rest = timezone._divmod_timedeltas(
+ delta, timedelta(hours=1)
+ )
+ minutes, rest = timezone._divmod_timedeltas(
+ rest, timedelta(minutes=1)
+ )
+ result = "UTC%s%02d:%02d" % (sign, hours, minutes)
+ if rest.seconds:
+ result += ":%02d" % (rest.seconds,)
+ if rest.microseconds:
+ result += ".%06d" % (rest.microseconds,)
+ return result
+
+ _maxoffset = timedelta(hours=23, minutes=59)
+ _minoffset = -_maxoffset
+
+ timezone.utc = timezone(timedelta(0))
#! coding: utf-8
import copy
+import datetime
import inspect
import sys
from sqlalchemy.util import compat
from sqlalchemy.util import get_callable_argspec
from sqlalchemy.util import langhelpers
+from sqlalchemy.util import timezone
from sqlalchemy.util import WeakSequence
compat.decode_backslashreplace(message, "cp1251"),
util.u("some message П"),
)
+
+
+class TimezoneTest(fixtures.TestBase):
+ """test the python 2 backport of the "timezone" class.
+
+ Note under python 3, these tests work against the builtin timezone,
+ thereby providing confirmation that the tests are correct.
+
+ """
+
+ @testing.combinations(
+ (datetime.timedelta(0), "UTC"),
+ (datetime.timedelta(hours=5), "UTC+05:00"),
+ (datetime.timedelta(hours=5, minutes=10), "UTC+05:10"),
+ (datetime.timedelta(hours=5, minutes=10, seconds=27), "UTC+05:10:27"),
+ (datetime.timedelta(hours=-3, minutes=10), "UTC-02:50"),
+ (
+ datetime.timedelta(
+ hours=5, minutes=10, seconds=27, microseconds=550
+ ),
+ "UTC+05:10:27.000550",
+ ),
+ )
+ def test_tzname(self, td, expected):
+ eq_(timezone(td).tzname(None), expected)
+
+ def test_utcoffset(self):
+ eq_(
+ timezone(datetime.timedelta(hours=5)).utcoffset(None),
+ datetime.timedelta(hours=5),
+ )
+
+ def test_fromutc(self):
+ tzinfo = timezone(datetime.timedelta(hours=5))
+ dt = datetime.datetime(2017, 10, 5, 12, 55, 38, tzinfo=tzinfo)
+ eq_(
+ dt.astimezone(timezone.utc),
+ datetime.datetime(2017, 10, 5, 7, 55, 38, tzinfo=timezone.utc),
+ )
+
+ # this is the same as hours=-3
+ del_ = datetime.timedelta(days=-1, seconds=75600)
+ eq_(
+ dt.astimezone(timezone(datetime.timedelta(hours=-3))),
+ datetime.datetime(2017, 10, 5, 4, 55, 38, tzinfo=timezone(del_)),
+ )
+
+ @testing.requires.python3
+ def test_repr_py3k(self):
+ eq_(
+ repr(timezone(datetime.timedelta(hours=5))),
+ "datetime.timezone(%r)" % (datetime.timedelta(hours=5)),
+ )
+
+ @testing.requires.python2
+ def test_repr_py2k(self):
+ eq_(
+ repr(timezone(datetime.timedelta(hours=5))),
+ "sqlalchemy.util.timezone(%r)" % (datetime.timedelta(hours=5)),
+ )
from sqlalchemy.dialects.mssql import ROWVERSION
from sqlalchemy.dialects.mssql import TIMESTAMP
from sqlalchemy.dialects.mssql.base import _MSDate
+from sqlalchemy.dialects.mssql.base import DATETIMEOFFSET
from sqlalchemy.dialects.mssql.base import MS_2005_VERSION
from sqlalchemy.dialects.mssql.base import MS_2008_VERSION
from sqlalchemy.dialects.mssql.base import TIME
Column("adate", Date),
Column("atime", Time),
Column("adatetime", DateTime),
+ Column("adatetimeoffset", DATETIMEOFFSET),
)
metadata.create_all()
d1 = datetime.date(2007, 10, 30)
t1 = datetime.time(11, 2, 32)
d2 = datetime.datetime(2007, 10, 30, 11, 2, 32)
- t.insert().execute(adate=d1, adatetime=d2, atime=t1)
+ dto = datetime.datetime(
+ 2007,
+ 10,
+ 30,
+ 11,
+ 2,
+ 32,
+ 0,
+ util.timezone(datetime.timedelta(hours=1)),
+ )
+ t.insert().execute(
+ adate=d1, adatetime=d2, atime=t1, adatetimeoffset=dto
+ )
# NOTE: this previously passed 'd2' for "adate" even though
# "adate" is a date column; we asserted that it truncated w/o issue.
# As of pyodbc 4.0.22, this is no longer accepted, was accepted
# in 4.0.21. See also the new pyodbc assertions regarding numeric
# precision.
- t.insert().execute(adate=d1, adatetime=d2, atime=d2)
+ t.insert().execute(
+ adate=d1, adatetime=d2, atime=d2, adatetimeoffset=dto
+ )
x = t.select().execute().fetchall()[0]
self.assert_(x.adate.__class__ == datetime.date)
self.assert_(x.atime.__class__ == datetime.time)
self.assert_(x.adatetime.__class__ == datetime.datetime)
+ self.assert_(x.adatetimeoffset.__class__ == datetime.datetime)
t.delete().execute()
- t.insert().execute(adate=d1, adatetime=d2, atime=t1)
+ t.insert().execute(
+ adate=d1, adatetime=d2, atime=t1, adatetimeoffset=dto
+ )
eq_(
- select([t.c.adate, t.c.atime, t.c.adatetime], t.c.adate == d1)
+ select(
+ [t.c.adate, t.c.atime, t.c.adatetime, t.c.adatetimeoffset],
+ t.c.adate == d1,
+ )
.execute()
.fetchall(),
- [(d1, t1, d2)],
+ [(d1, t1, d2, dto)],
)
@emits_warning_on("mssql+mxodbc", r".*does not have any indexes.*")