def get_col_spec(self):
return "DATETIME"
- def convert_bind_param(self, value, dialect):
- if hasattr(value, "isoformat"):
- #return value.isoformat(' ')
- # isoformat() bings on apodbapi -- reported/suggested by Peter Buschman
- return value.strftime('%Y-%m-%d %H:%M:%S')
- else:
- return value
+class MSDate(sqltypes.Date):
+ def __init__(self, *a, **kw):
+ super(MSDate, self).__init__(False)
+ def get_col_spec(self):
+ return "SMALLDATETIME"
+
+class MSDateTime_adodbapi(MSDateTime):
def convert_result_value(self, value, dialect):
# adodbapi will return datetimes with empty time values as datetime.date() objects.
# Promote them back to full datetime.datetime()
return datetime.datetime(value.year, value.month, value.day)
return value
-class MSDate(sqltypes.Date):
- def __init__(self, *a, **kw):
- super(MSDate, self).__init__(False)
+class MSDateTime_pyodbc(MSDateTime):
+ def convert_bind_param(self, value, dialect):
+ if value and not hasattr(value, 'second'):
+ return datetime.datetime(value.year, value.month, value.day)
+ else:
+ return value
- def get_col_spec(self):
- return "SMALLDATETIME"
-
+class MSDate_pyodbc(MSDate):
def convert_bind_param(self, value, dialect):
- if value and hasattr(value, "isoformat"):
- return value.strftime('%Y-%m-%d %H:%M')
- return value
+ if value and not hasattr(value, 'second'):
+ return datetime.datetime(value.year, value.month, value.day)
+ else:
+ return value
+ def convert_result_value(self, value, dialect):
+ # pyodbc returns SMALLDATETIME values as datetime.datetime(). truncate it back to datetime.date()
+ if value and hasattr(value, 'second'):
+ return value.date()
+ else:
+ return value
+
+class MSDate_pymssql(MSDate):
def convert_result_value(self, value, dialect):
# pymssql will return SMALLDATETIME values as datetime.datetime(), truncate it back to datetime.date()
if value and hasattr(value, 'second'):
return value.date()
- return value
+ else:
+ return value
class MSText(sqltypes.TEXT):
def get_col_spec(self):
def get_col_spec(self):
return "VARCHAR(%(length)s)" % {'length' : self.length}
-class MSNVarchar(MSString):
+class MSNVarchar(sqltypes.Unicode):
def get_col_spec(self):
if self.length:
return "NVARCHAR(%(length)s)" % {'length' : self.length}
else:
return value and True or False
+class MSTimeStamp(sqltypes.TIMESTAMP):
+ def get_col_spec(self):
+ return "TIMESTAMP"
+
def descriptor():
return {'name':'mssql',
'description':'MSSQL',
if self.IINSERT:
# TODO: quoting rules for table name here ?
- self.cursor.execute("SET IDENTITY_INSERT %s ON" % self.compiled.statement.table.name)
+ self.cursor.execute("SET IDENTITY_INSERT %s ON" % self.compiled.statement.table.fullname)
super(MSSQLExecutionContext, self).pre_exec()
if self.compiled.isinsert:
if self.IINSERT:
# TODO: quoting rules for table name here ?
- self.cursor.execute("SET IDENTITY_INSERT %s OFF" % self.compiled.statement.table.name)
+ self.cursor.execute("SET IDENTITY_INSERT %s OFF" % self.compiled.statement.table.fullname)
self.IINSERT = False
elif self.HASIDENT:
if self.dialect.use_scope_identity:
sqltypes.TEXT : MSText,
sqltypes.CHAR: MSChar,
sqltypes.NCHAR: MSNChar,
+ sqltypes.TIMESTAMP: MSTimeStamp,
}
ischema_names = {
'binary' : MSBinary,
'bit': MSBoolean,
'real' : MSFloat,
- 'image' : MSBinary
+ 'image' : MSBinary,
+ 'timestamp': MSTimeStamp,
}
def __new__(cls, dbapi=None, *args, **kwargs):
super(MSSQLDialect, self).__init__(**params)
self.auto_identity_insert = auto_identity_insert
self.text_as_varchar = False
- self.use_scope_identity = True
+ self.use_scope_identity = False
self.set_default_schema_name("dbo")
def dbapi(cls, module_name=None):
return module
import_dbapi = classmethod(import_dbapi)
+ colspecs = MSSQLDialect.colspecs.copy()
+ colspecs[sqltypes.Date] = MSDate_pymssql
+
+ ischema_names = MSSQLDialect.ischema_names.copy()
+ ischema_names['smalldatetime'] = MSDate_pymssql
+
+ def __init__(self, **params):
+ super(MSSQLDialect_pymssql, self).__init__(**params)
+ self.use_scope_identity = True
+
def supports_sane_rowcount(self):
return True
colspecs = MSSQLDialect.colspecs.copy()
colspecs[sqltypes.Unicode] = AdoMSNVarchar
+ colspecs[sqltypes.Date] = MSDate_pyodbc
+ colspecs[sqltypes.DateTime] = MSDateTime_pyodbc
+
ischema_names = MSSQLDialect.ischema_names.copy()
ischema_names['nvarchar'] = AdoMSNVarchar
+ ischema_names['smalldatetime'] = MSDate_pyodbc
+ ischema_names['datetime'] = MSDateTime_pyodbc
def supports_sane_rowcount(self):
return False
+ def supports_unicode_statements(self):
+ """indicate whether the DBAPI can receive SQL statements as Python unicode strings"""
+ return True
+
def make_connect_string(self, keys):
connectors = ["Driver={SQL Server}"]
connectors.append("Server=%s" % keys.get("host"))
colspecs = MSSQLDialect.colspecs.copy()
colspecs[sqltypes.Unicode] = AdoMSNVarchar
+ colspecs[sqltypes.DateTime] = MSDateTime_adodbapi
+
ischema_names = MSSQLDialect.ischema_names.copy()
ischema_names['nvarchar'] = AdoMSNVarchar
+ ischema_names['datetime'] = MSDateTime_adodbapi
def supports_sane_rowcount(self):
return True
+ def supports_unicode_statements(self):
+ """indicate whether the DBAPI can receive SQL statements as Python unicode strings"""
+ return True
+
def make_connect_string(self, keys):
connectors = ["Provider=SQLOLEDB"]
if 'port' in keys:
finally:
db.engine.dialect.convert_unicode = prev_unicode
+ def testlength(self):
+ """checks the database correctly understands the length of a unicode string"""
+ teststr = u'aaa\x1234'
+ self.assert_(db.func.length(teststr).scalar() == len(teststr))
+
class BinaryTest(AssertMixin):
def setUpAll(self):
global binary_table
#x = db.text("select * from query_users_with_date where user_datetime=:date", bindparams=[bindparam('date', )]).execute(date=datetime.datetime(2005, 11, 10, 11, 52, 35)).fetchall()
#print repr(x)
+ @testbase.unsupported('sqlite')
+ def testdate2(self):
+ t = Table('testdate', testbase.metadata, Column('id', Integer, primary_key=True),
+ Column('adate', Date), Column('adatetime', DateTime))
+ t.create()
+ try:
+ d1 = datetime.date(2007, 10, 30)
+ t.insert().execute(adate=d1, adatetime=d1)
+ d2 = datetime.datetime(2007, 10, 30)
+ t.insert().execute(adate=d2, adatetime=d2)
+
+ x = t.select().execute().fetchall()[0]
+ self.assert_(x.adate.__class__ == datetime.date)
+ self.assert_(x.adatetime.__class__ == datetime.datetime)
+
+ finally:
+ t.drop()
+
class TimezoneTest(AssertMixin):
"""test timezone-aware datetimes. psycopg will return a datetime with a tzinfo attached to it,
if postgres returns it. python then will not let you compare a datetime with a tzinfo to a datetime