From: Mike Bayer Date: Fri, 19 Sep 2008 22:59:28 +0000 (+0000) Subject: - Overhauled SQLite date/time bind/result processing X-Git-Tag: rel_0_5rc2~35 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8d2fd5f87aaf837042d2dc5e692f84deb7dd1710;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - Overhauled SQLite date/time bind/result processing to use regular expressions and format strings, rather than strptime/strftime, to generically support pre-1900 dates, dates with microseconds. [ticket:968] --- diff --git a/CHANGES b/CHANGES index 6ec64c9c9a..35b8a8d28e 100644 --- a/CHANGES +++ b/CHANGES @@ -21,6 +21,13 @@ CHANGES - column.in_(someselect) can now be used as a columns-clause expression without the subquery bleeding into the FROM clause [ticket:1074] + +- sqlite + - Overhauled SQLite date/time bind/result processing + to use regular expressions and format strings, rather + than strptime/strftime, to generically support + pre-1900 dates, dates with microseconds. [ticket:968] + 0.5.0rc1 ======== diff --git a/lib/sqlalchemy/databases/sqlite.py b/lib/sqlalchemy/databases/sqlite.py index 3840cb64cc..1486c9f360 100644 --- a/lib/sqlalchemy/databases/sqlite.py +++ b/lib/sqlalchemy/databases/sqlite.py @@ -54,76 +54,68 @@ class SLSmallInteger(sqltypes.Smallinteger): return "SMALLINT" class DateTimeMixin(object): - __format__ = "%Y-%m-%d %H:%M:%S" - __legacy_microseconds__ = False - - def bind_processor(self, dialect): + def _bind_processor(self, format, element_range): + elem = ("year", "month", "day", "hour", + "minute", "second", "microsecond")[element_range[0]:element_range[1]] def process(value): if not isinstance(value, (NoneType, datetime.date, datetime.datetime, datetime.time)): - raise TypeError("SQLite Date, Time, and DateTime types only accept Python datetime objects as input.r") + raise TypeError("SQLite Date, Time, and DateTime types only accept Python datetime objects as input.") elif value is not None: - if self.__microsecond__ and getattr(value, 'microsecond', None) is not None: - if self.__legacy_microseconds__: - return value.strftime(self.__format__ + '.' + str(value.microsecond)) - else: - return value.strftime(self.__format__ + ('.%06d' % value.microsecond)) - else: - return value.strftime(self.__format__) + return format % tuple([getattr(value, attr, 0) for attr in elem]) else: return None return process - def _cvt(self, value, dialect): - if value is None: - return None - try: - (value, microsecond) = value.split('.') - if self.__legacy_microseconds__: - microsecond = int(microsecond) + def _result_processor(self, fn, regexp): + def process(value): + if value is not None: + return fn(*[int(x or 0) for x in regexp.match(value).groups()]) else: - microsecond = int((microsecond + '000000')[0:6]) - except ValueError: - microsecond = 0 - return time.strptime(value, self.__format__)[0:6] + (microsecond,) + return None + return process class SLDateTime(DateTimeMixin, sqltypes.DateTime): - __format__ = "%Y-%m-%d %H:%M:%S" - __microsecond__ = True + __legacy_microseconds__ = False def get_col_spec(self): return "TIMESTAMP" + def bind_processor(self, dialect): + if self.__legacy_microseconds__: + return self._bind_processor("%4.4d-%2.2d-%2.2d %2.2d:%2.2d:%2.2d.%s", (0, 7)) + else: + return self._bind_processor("%4.4d-%2.2d-%2.2d %2.2d:%2.2d:%2.2d.%06d", (0, 7)) + + _reg = re.compile(r"(\d+)-(\d+)-(\d+)(?: (\d+):(\d+):(\d+)(?:\.(\d+))?)?") def result_processor(self, dialect): - def process(value): - tup = self._cvt(value, dialect) - return tup and datetime.datetime(*tup) - return process + return self._result_processor(datetime.datetime, self._reg) class SLDate(DateTimeMixin, sqltypes.Date): - __format__ = "%Y-%m-%d" - __microsecond__ = False - def get_col_spec(self): return "DATE" + def bind_processor(self, dialect): + return self._bind_processor("%4.4d-%2.2d-%2.2d", (0, 3)) + + _reg = re.compile(r"(\d+)-(\d+)-(\d+)") def result_processor(self, dialect): - def process(value): - tup = self._cvt(value, dialect) - return tup and datetime.date(*tup[0:3]) - return process + return self._result_processor(datetime.date, self._reg) class SLTime(DateTimeMixin, sqltypes.Time): - __format__ = "%H:%M:%S" - __microsecond__ = True + __legacy_microseconds__ = False def get_col_spec(self): return "TIME" + def bind_processor(self, dialect): + if self.__legacy_microseconds__: + return self._bind_processor("%2.2d:%2.2d:%2.2d.%s", (3, 7)) + else: + return self._bind_processor("%2.2d:%2.2d:%2.2d.%06d", (3, 7)) + + _reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d+))?") def result_processor(self, dialect): - def process(value): - tup = self._cvt(value, dialect) - return tup and datetime.time(*tup[3:7]) - return process + return self._result_processor(datetime.time, self._reg) class SLText(sqltypes.Text): def get_col_spec(self): diff --git a/test/dialect/sqlite.py b/test/dialect/sqlite.py index 8c9d03d975..2c5f32ff92 100644 --- a/test/dialect/sqlite.py +++ b/test/dialect/sqlite.py @@ -25,6 +25,7 @@ class TestTypes(TestBase, AssertsExecutionResults): self.assertEquals(rp(bp(dt)), dt) sldt.__legacy_microseconds__ = True + bp = sldt.bind_processor(None) self.assertEquals(bp(dt), '2008-06-27 12:00:00.125') self.assertEquals(rp(bp(dt)), dt) diff --git a/test/sql/testtypes.py b/test/sql/testtypes.py index 8642c4fa3a..2ed8f37d75 100644 --- a/test/sql/testtypes.py +++ b/test/sql/testtypes.py @@ -521,19 +521,19 @@ class DateTest(TestBase, AssertsExecutionResults): if testing.against('oracle'): import sqlalchemy.databases.oracle as oracle insert_data = [ - [7, 'jack', + (7, 'jack', datetime.datetime(2005, 11, 10, 0, 0), datetime.date(2005,11,10), - datetime.datetime(2005, 11, 10, 0, 0, 0, 29384)], - [8, 'roy', + datetime.datetime(2005, 11, 10, 0, 0, 0, 29384)), + (8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35), datetime.date(2005,10,10), - datetime.datetime(2006, 5, 10, 15, 32, 47, 6754)], - [9, 'foo', + datetime.datetime(2006, 5, 10, 15, 32, 47, 6754)), + (9, 'foo', datetime.datetime(2006, 11, 10, 11, 52, 35), datetime.date(1970,4,1), - datetime.datetime(2004, 9, 18, 4, 0, 52, 1043)], - [10, 'colber', None, None, None] + datetime.datetime(2004, 9, 18, 4, 0, 52, 1043)), + (10, 'colber', None, None, None), ] fnames = ['user_id', 'user_name', 'user_datetime', 'user_date', 'user_time'] @@ -555,20 +555,22 @@ class DateTest(TestBase, AssertsExecutionResults): time_micro = 0 insert_data = [ - [7, 'jack', + (7, 'jack', datetime.datetime(2005, 11, 10, 0, 0), datetime.date(2005, 11, 10), - datetime.time(12, 20, 2)], - [8, 'roy', + datetime.time(12, 20, 2)), + (8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35), datetime.date(2005, 10, 10), - datetime.time(0, 0, 0)], - [9, 'foo', + datetime.time(0, 0, 0)), + (9, 'foo', datetime.datetime(2005, 11, 10, 11, 52, 35, datetime_micro), datetime.date(1970, 4, 1), - datetime.time(23, 59, 59, time_micro)], - [10, 'colber', None, None, None] + datetime.time(23, 59, 59, time_micro)), + (10, 'colber', None, None, None), ] + + fnames = ['user_id', 'user_name', 'user_datetime', 'user_date', 'user_time'] @@ -578,6 +580,14 @@ class DateTest(TestBase, AssertsExecutionResults): Column('user_date', Date), Column('user_time', Time)] + if testing.against('sqlite', 'postgres'): + insert_data.append( + (11, 'historic', + datetime.datetime(1850, 11, 10, 11, 52, 35, datetime_micro), + datetime.date(1727,4,1), + None), + ) + users_with_date = Table('query_users_with_date', MetaData(testing.db), *collist) users_with_date.create() @@ -592,7 +602,7 @@ class DateTest(TestBase, AssertsExecutionResults): def testdate(self): global insert_data - l = map(list, users_with_date.select().execute().fetchall()) + l = map(tuple, users_with_date.select().execute().fetchall()) self.assert_(l == insert_data, 'DateTest mismatch: got:%s expected:%s' % (l, insert_data))