from sqlalchemy.dialects.mssql.base import MS_2008_VERSION
from sqlalchemy.dialects.mssql.base import TIME
from sqlalchemy.sql import sqltypes
+from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import AssertsExecutionResults
for col in reflected_dates.c:
self.assert_types_base(col, dates_table.c[col.key])
- def test_date_roundtrip(self):
+ @testing.metadata_fixture()
+ def date_fixture(self, metadata):
t = Table(
"test_dates",
metadata,
- Column(
- "id",
- Integer,
- Sequence("datetest_id_seq", optional=True),
- primary_key=True,
- ),
Column("adate", Date),
- Column("atime", Time),
+ Column("atime1", Time),
+ Column("atime2", Time),
Column("adatetime", DateTime),
Column("adatetimeoffset", DATETIMEOFFSET),
)
- metadata.create_all()
+
d1 = datetime.date(2007, 10, 30)
t1 = datetime.time(11, 2, 32)
d2 = datetime.datetime(2007, 10, 30, 11, 2, 32)
- dto = datetime.datetime(
- 2007,
- 10,
- 30,
- 11,
- 2,
- 32,
- 123456,
- util.timezone(datetime.timedelta(hours=1)),
- )
- t.insert().execute(
- adate=d1, adatetime=d2, atime=t1, adatetimeoffset=dto
- )
-
- # NOTE: this previously passed 'd2' for "adate" even though
- # "adate" is a date column; we asserted that it truncated w/o issue.
- # As of pyodbc 4.0.22, this is no longer accepted, was accepted
- # in 4.0.21. See also the new pyodbc assertions regarding numeric
- # precision.
- t.insert().execute(
- adate=d1, adatetime=d2, atime=d2, adatetimeoffset=dto
- )
+ return t, (d1, t1, d2)
- x = t.select().execute().fetchall()[0]
- self.assert_(x.adate.__class__ == datetime.date)
- self.assert_(x.atime.__class__ == datetime.time)
- self.assert_(x.adatetime.__class__ == datetime.datetime)
- self.assert_(x.adatetimeoffset.__class__ == datetime.datetime)
+ def test_date_roundtrips(self, date_fixture):
+ t, (d1, t1, d2) = date_fixture
+ with testing.db.begin() as conn:
+ conn.execute(
+ t.insert(), adate=d1, adatetime=d2, atime1=t1, atime2=d2
+ )
- t.delete().execute()
+ row = conn.execute(t.select()).first()
+ eq_(
+ (row.adate, row.adatetime, row.atime1, row.atime2),
+ (d1, d2, t1, d2.time()),
+ )
- t.insert().execute(
- adate=d1, adatetime=d2, atime=t1, adatetimeoffset=dto
+ @testing.metadata_fixture()
+ def datetimeoffset_fixture(self, metadata):
+ t = Table(
+ "test_dates", metadata, Column("adatetimeoffset", DATETIMEOFFSET),
)
- eq_(
- select(
- [t.c.adate, t.c.atime, t.c.adatetime, t.c.adatetimeoffset],
- t.c.adate == d1,
+ return t
+
+ @testing.combinations(
+ ("dto_param_none", None, None, False),
+ (
+ "dto_param_datetime_aware_positive",
+ datetime.datetime(
+ 2007,
+ 10,
+ 30,
+ 11,
+ 2,
+ 32,
+ 123456,
+ util.timezone(datetime.timedelta(hours=1)),
+ ),
+ 1,
+ False,
+ ),
+ (
+ "dto_param_datetime_aware_negative",
+ datetime.datetime(
+ 2007,
+ 10,
+ 30,
+ 11,
+ 2,
+ 32,
+ 123456,
+ util.timezone(datetime.timedelta(hours=-5)),
+ ),
+ -5,
+ False,
+ ),
+ (
+ "dto_param_datetime_aware_seconds_frac_fail",
+ datetime.datetime(
+ 2007,
+ 10,
+ 30,
+ 11,
+ 2,
+ 32,
+ 123456,
+ util.timezone(datetime.timedelta(seconds=4000)),
+ ),
+ None,
+ True,
+ testing.requires.python3,
+ ),
+ (
+ "dto_param_datetime_naive",
+ datetime.datetime(2007, 10, 30, 11, 2, 32, 123456),
+ 0,
+ False,
+ ),
+ (
+ "dto_param_string_one",
+ "2007-10-30 11:02:32.123456 +01:00",
+ 1,
+ False,
+ ),
+ # wow
+ ("dto_param_string_two", "October 30, 2007 11:02:32.123456", 0, False),
+ ("dto_param_string_invalid", "this is not a date", 0, True),
+ id_="iaaa",
+ argnames="dto_param_value, expected_offset_hours, should_fail",
+ )
+ def test_datetime_offset(
+ self,
+ datetimeoffset_fixture,
+ dto_param_value,
+ expected_offset_hours,
+ should_fail,
+ ):
+ t = datetimeoffset_fixture
+ with testing.db.begin() as conn:
+ if should_fail:
+ assert_raises(
+ sa.exc.DBAPIError,
+ conn.execute,
+ t.insert(),
+ adatetimeoffset=dto_param_value,
+ )
+ return
+
+ conn.execute(
+ t.insert(), adatetimeoffset=dto_param_value,
)
- .execute()
- .fetchall(),
- [(d1, t1, d2, dto)],
- )
+
+ row = conn.execute(t.select()).first()
+
+ if dto_param_value is None:
+ is_(row.adatetimeoffset, None)
+ else:
+ eq_(
+ row.adatetimeoffset,
+ datetime.datetime(
+ 2007,
+ 10,
+ 30,
+ 11,
+ 2,
+ 32,
+ 123456,
+ util.timezone(
+ datetime.timedelta(hours=expected_offset_hours)
+ ),
+ ),
+ )
@emits_warning_on("mssql+mxodbc", r".*does not have any indexes.*")
@testing.provide_metadata