If ``nullable`` is ``True`` or ``False`` then the column will be
``NULL` or ``NOT NULL`` respectively.
+Date / Time Handling
+--------------------
+For MSSQL versions that support the ``DATE`` and ``TIME`` types
+(MSSQL 2008+) the data type is used. For versions that do not
+support the ``DATE`` and ``TIME`` types a ``DATETIME`` type is used
+instead and the MSSQL dialect handles converting the results
+properly. This means ``Date()`` and ``Time()`` are fully supported
+on all versions of MSSQL. If you do not desire this behavior then
+do not use the ``Date()`` or ``Time()`` types.
+
Compatibility Levels
--------------------
MSSQL supports the notion of setting compatibility levels at the
return "SMALLINT"
-class MSDateTime(sqltypes.DateTime):
- def __init__(self, *a, **kw):
- super(MSDateTime, self).__init__(False)
+class _DateTimeType(object):
+ """Base for MSSQL datetime types."""
+ def bind_processor(self, dialect):
+ # if we receive just a date we can manipulate it
+ # into a datetime since the db-api may not do this.
+ def process(value):
+ if type(value) is datetime.date:
+ return datetime.datetime(value.year, value.month, value.day)
+ return value
+ return process
+
+
+class MSDateTime(_DateTimeType, sqltypes.DateTime):
def get_col_spec(self):
return "DATETIME"
-class MSSmallDate(sqltypes.Date):
- def __init__(self, *a, **kw):
- super(MSSmallDate, self).__init__(False)
+class MSDate(sqltypes.Date):
+ def get_col_spec(self):
+ return "DATE"
+
+
+class MSTime(sqltypes.Time):
+ def __init__(self, precision=None, **kwargs):
+ self.precision = precision
+ super(MSTime, self).__init__()
+ def get_col_spec(self):
+ if self.precision:
+ return "TIME(%s)" % self.precision
+ else:
+ return "TIME"
+
+
+class MSSmallDateTime(_DateTimeType, sqltypes.TypeEngine):
def get_col_spec(self):
return "SMALLDATETIME"
- def result_processor(self, dialect):
- def process(value):
- # If the DBAPI returns the value as datetime.datetime(), truncate it back to datetime.date()
- if type(value) is datetime.datetime:
- return value.date()
- return value
- return process
+class MSDateTime2(_DateTimeType, sqltypes.TypeEngine):
+ def __init__(self, precision=None, **kwargs):
+ self.precision = precision
-class MSDate(sqltypes.Date):
- def __init__(self, *a, **kw):
+ def get_col_spec(self):
+ if self.precision:
+ return "DATETIME2(%s)" % self.precision
+ else:
+ return "DATETIME2"
+
+
+class MSDateTimeOffset(_DateTimeType, sqltypes.TypeEngine):
+ def __init__(self, precision=None, **kwargs):
+ self.precision = precision
+
+ def get_col_spec(self):
+ if self.precision:
+ return "DATETIMEOFFSET(%s)" % self.precision
+ else:
+ return "DATETIMEOFFSET"
+
+
+class MSDateTimeAsDate(_DateTimeType, MSDate):
+ """ This is an implementation of the Date type for versions of MSSQL that
+ do not support that specific type. In order to make it work a ``DATETIME``
+ column specification is used and the results get converted back to just
+ the date portion.
+
+ """
+
+ def __init__(self, *args, **kwargs):
super(MSDate, self).__init__(False)
def get_col_spec(self):
def result_processor(self, dialect):
def process(value):
- # If the DBAPI returns the value as datetime.datetime(), truncate it back to datetime.date()
+ # If the DBAPI returns the value as datetime.datetime(), truncate
+ # it back to datetime.date()
if type(value) is datetime.datetime:
return value.date()
return value
return process
-class MSTime(sqltypes.Time):
+class MSDateTimeAsTime(MSTime):
+ """ This is an implementation of the Time type for versions of MSSQL that
+ do not support that specific type. In order to make it work a ``DATETIME``
+ column specification is used and the results get converted back to just
+ the time portion.
+
+ """
+
__zero_date = datetime.date(1900, 1, 1)
def __init__(self, *a, **kw):
return process
-class MSDateTime_pyodbc(MSDateTime):
- def bind_processor(self, dialect):
- def process(value):
- if type(value) is datetime.date:
- return datetime.datetime(value.year, value.month, value.day)
- return value
- return process
-
-
-class MSDate_pyodbc(MSDate):
- def bind_processor(self, dialect):
- def process(value):
- if type(value) is datetime.date:
- return datetime.datetime(value.year, value.month, value.day)
- return value
- return process
-
-
class MSText(_StringType, sqltypes.Text):
"""MSSQL TEXT type, for variable-length text up to 2^31 characters."""
'numeric' : MSNumeric,
'float' : MSFloat,
'datetime' : MSDateTime,
+ 'datetime2' : MSDateTime2,
+ 'datetimeoffset' : MSDateTimeOffset,
'date': MSDate,
- 'smalldatetime' : MSSmallDate,
+ 'time': MSTime,
+ 'smalldatetime' : MSSmallDateTime,
'binary' : MSBinary,
'varbinary' : MSVarBinary,
'bit': MSBoolean,
def __init__(self, description_encoding='latin-1', **params):
super(MSSQLDialect_pyodbc, self).__init__(**params)
self.description_encoding = description_encoding
+
+ self.colspecs = MSSQLDialect.colspecs.copy()
+ self.ischema_names = MSSQLDialect.ischema_names.copy()
+ if self.server_version_info < (10,):
+ self.colspecs[sqltypes.Date] = MSDateTimeAsDate
+ self.colspecs[sqltypes.Time] = MSDateTimeAsTime
+
# FIXME: scope_identity sniff should look at server version, not the ODBC driver
# whether use_scope_identity will work depends on the version of pyodbc
try:
import pyodbc as module
return module
- colspecs = MSSQLDialect.colspecs.copy()
- colspecs[sqltypes.Date] = MSDate_pyodbc
- colspecs[sqltypes.DateTime] = MSDateTime_pyodbc
- ischema_names = MSSQLDialect.ischema_names.copy()
- ischema_names['smalldatetime'] = MSDate_pyodbc
- ischema_names['datetime'] = MSDateTime_pyodbc
-
def make_connect_string(self, keys, query):
if 'max_identifier_length' in keys:
self.max_identifier_length = int(keys.pop('max_identifier_length'))
import testenv; testenv.configure_for_tests()
-import os, pickleable, re
+import datetime, os, pickleable, re
from sqlalchemy import *
from sqlalchemy import types, exc
from sqlalchemy.orm import *
raise
money_table.drop()
+ def test_dates(self):
+ "Exercise type specification for date types."
+
+ columns = [
+ # column type, args, kwargs, expected ddl
+ (mssql.MSDateTime, [], {},
+ 'DATETIME', []),
+
+ (mssql.MSDate, [], {},
+ 'DATE', ['>=', (10,)]),
+ (mssql.MSDate, [], {},
+ 'DATETIME', ['<', (10,)], mssql.MSDateTime),
+
+ (mssql.MSTime, [], {},
+ 'TIME', ['>=', (10,)]),
+ (mssql.MSTime, [1], {},
+ 'TIME(1)', ['>=', (10,)]),
+ (mssql.MSTime, [], {},
+ 'DATETIME', ['<', (10,)], mssql.MSDateTime),
+
+ (mssql.MSSmallDateTime, [], {},
+ 'SMALLDATETIME', []),
+
+ (mssql.MSDateTimeOffset, [], {},
+ 'DATETIMEOFFSET', ['>=', (10,)]),
+ (mssql.MSDateTimeOffset, [1], {},
+ 'DATETIMEOFFSET(1)', ['>=', (10,)]),
+
+ (mssql.MSDateTime2, [], {},
+ 'DATETIME2', ['>=', (10,)]),
+ (mssql.MSDateTime2, [1], {},
+ 'DATETIME2(1)', ['>=', (10,)]),
+
+ ]
+
+ table_args = ['test_mssql_dates', MetaData(testing.db)]
+ for index, spec in enumerate(columns):
+ type_, args, kw, res, requires = spec[0:5]
+ if (requires and testing._is_excluded('mssql', *requires)) or not requires:
+ table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
+
+ dates_table = Table(*table_args)
+ gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+
+ for col in dates_table.c:
+ index = int(col.name[1:])
+ testing.eq_(gen.get_column_specification(col),
+ "%s %s" % (col.name, columns[index][3]))
+ self.assert_(repr(col))
+
+ try:
+ dates_table.create(checkfirst=True)
+ assert True
+ except:
+ raise
+
+ reflected_dates = Table('test_mssql_dates', MetaData(testing.db), autoload=True)
+ for col in reflected_dates.c:
+ index = int(col.name[1:])
+ testing.eq_(testing.db.dialect.type_descriptor(col.type).__class__,
+ len(columns[index]) > 5 and columns[index][5] or columns[index][0])
+ dates_table.drop()
+
+ def test_dates2(self):
+ meta = MetaData(testing.db)
+ t = Table('test_dates', meta,
+ Column('id', Integer,
+ Sequence('datetest_id_seq', optional=True),
+ primary_key=True),
+ Column('adate', Date),
+ Column('atime', Time),
+ Column('adatetime', DateTime))
+ t.create(checkfirst=True)
+ try:
+ d1 = datetime.date(2007, 10, 30)
+ t1 = datetime.time(11, 2, 32)
+ d2 = datetime.datetime(2007, 10, 30, 11, 2, 32)
+ t.insert().execute(adate=d1, adatetime=d2, atime=t1)
+ t.insert().execute(adate=d2, adatetime=d2, atime=d2)
+
+ x = t.select().execute().fetchall()[0]
+ self.assert_(x.adate.__class__ == datetime.date)
+ self.assert_(x.atime.__class__ == datetime.time)
+ self.assert_(x.adatetime.__class__ == datetime.datetime)
+
+ t.delete().execute()
+
+ t.insert().execute(adate=d1, adatetime=d2, atime=t1)
+
+ self.assertEquals(select([t.c.adate, t.c.atime, t.c.adatetime], t.c.adate==d1).execute().fetchall(), [(d1, t1, d2)])
+
+ finally:
+ t.drop(checkfirst=True)
+
def test_binary(self):
"Exercise type specification for binary types."