""" # noqa
+from datetime import datetime, timezone, timedelta
import decimal
import re
+import struct
from .base import BINARY
+from .base import DATETIMEOFFSET
from .base import MSDialect
from .base import MSExecutionContext
from .base import VARBINARY
return process
+class _ODBCDateTimeOffset(DATETIMEOFFSET):
+
+ def bind_processor(self, dialect):
+ def process(value):
+ """Convert to string format required by T-SQL."""
+ dto_string = value.strftime("%Y-%m-%d %H:%M:%S %z")
+ # offset needs a colon, e.g., -0700 -> -07:00
+ return dto_string[:23] + ":" + dto_string[23:]
+
+ return process
+
+
class _VARBINARY_pyodbc(_ms_binary_pyodbc, VARBINARY):
pass
sqltypes.Numeric: _MSNumeric_pyodbc,
sqltypes.Float: _MSFloat_pyodbc,
BINARY: _BINARY_pyodbc,
+ DATETIMEOFFSET: _ODBCDateTimeOffset,
# SQL Server dialect has a VARBINARY that is just to support
# "deprecate_large_types" w/ VARBINARY(max), but also we must
# handle the usual SQL standard VARBINARY
pass
return tuple(version)
+ def on_connect(self):
+ super_ = super(MSDialect_pyodbc, self).on_connect()
+
+ def on_connect(conn):
+ if super_ is not None:
+ super_(conn)
+
+ # output converter function for datetimeoffset
+ def _handle_datetimeoffset(dto_value):
+ # ref: https://github.com/mkleehammer/pyodbc/issues/134#issuecomment-281739794
+ tup = struct.unpack("<6hI2h", dto_value) # e.g., (2017, 3, 16, 10, 35, 18, 0, -6, 0)
+ return datetime(tup[0], tup[1], tup[2], tup[3], tup[4], tup[5], tup[6] // 1000,
+ timezone(timedelta(hours=tup[7], minutes=tup[8])))
+
+ odbc_SQL_SS_TIMESTAMPOFFSET = -155 # as defined in SQLNCLI.h
+ conn.add_output_converter(odbc_SQL_SS_TIMESTAMPOFFSET, _handle_datetimeoffset)
+
+ return on_connect
+
def do_executemany(self, cursor, statement, parameters, context=None):
if self.fast_executemany:
cursor.fast_executemany = True
from sqlalchemy.dialects.mssql import ROWVERSION
from sqlalchemy.dialects.mssql import TIMESTAMP
from sqlalchemy.dialects.mssql.base import _MSDate
+from sqlalchemy.dialects.mssql.base import DATETIMEOFFSET
from sqlalchemy.dialects.mssql.base import MS_2005_VERSION
from sqlalchemy.dialects.mssql.base import MS_2008_VERSION
from sqlalchemy.dialects.mssql.base import TIME
Column("adate", Date),
Column("atime", Time),
Column("adatetime", DateTime),
+ Column("adatetimeoffset", DATETIMEOFFSET),
)
metadata.create_all()
d1 = datetime.date(2007, 10, 30)
t1 = datetime.time(11, 2, 32)
d2 = datetime.datetime(2007, 10, 30, 11, 2, 32)
- t.insert().execute(adate=d1, adatetime=d2, atime=t1)
+ dto = datetime.datetime(2007, 10, 30, 11, 2, 32, 0,
+ datetime.timezone(datetime.timedelta(hours=1)))
+ t.insert().execute(adate=d1, adatetime=d2, atime=t1, adatetimeoffset=dto)
# NOTE: this previously passed 'd2' for "adate" even though
# "adate" is a date column; we asserted that it truncated w/o issue.
# As of pyodbc 4.0.22, this is no longer accepted, was accepted
# in 4.0.21. See also the new pyodbc assertions regarding numeric
# precision.
- t.insert().execute(adate=d1, adatetime=d2, atime=d2)
+ t.insert().execute(adate=d1, adatetime=d2, atime=d2, adatetimeoffset=dto)
x = t.select().execute().fetchall()[0]
self.assert_(x.adate.__class__ == datetime.date)
self.assert_(x.atime.__class__ == datetime.time)
self.assert_(x.adatetime.__class__ == datetime.datetime)
+ self.assert_(x.adatetimeoffset.__class__ == datetime.datetime)
t.delete().execute()
- t.insert().execute(adate=d1, adatetime=d2, atime=t1)
+ t.insert().execute(adate=d1, adatetime=d2, atime=t1, adatetimeoffset=dto)
eq_(
- select([t.c.adate, t.c.atime, t.c.adatetime], t.c.adate == d1)
+ select([t.c.adate, t.c.atime, t.c.adatetime, t.c.adatetimeoffset], t.c.adate == d1)
.execute()
.fetchall(),
- [(d1, t1, d2)],
+ [(d1, t1, d2, dto)],
)
@emits_warning_on("mssql+mxodbc", r".*does not have any indexes.*")