return "SMALLINT"
class DateTimeMixin(object):
- __format__ = "%Y-%m-%d %H:%M:%S"
- __legacy_microseconds__ = False
-
- def bind_processor(self, dialect):
+ def _bind_processor(self, format, element_range):
+ elem = ("year", "month", "day", "hour",
+ "minute", "second", "microsecond")[element_range[0]:element_range[1]]
def process(value):
if not isinstance(value, (NoneType, datetime.date, datetime.datetime, datetime.time)):
- raise TypeError("SQLite Date, Time, and DateTime types only accept Python datetime objects as input.r")
+ raise TypeError("SQLite Date, Time, and DateTime types only accept Python datetime objects as input.")
elif value is not None:
- if self.__microsecond__ and getattr(value, 'microsecond', None) is not None:
- if self.__legacy_microseconds__:
- return value.strftime(self.__format__ + '.' + str(value.microsecond))
- else:
- return value.strftime(self.__format__ + ('.%06d' % value.microsecond))
- else:
- return value.strftime(self.__format__)
+ return format % tuple([getattr(value, attr, 0) for attr in elem])
else:
return None
return process
- def _cvt(self, value, dialect):
- if value is None:
- return None
- try:
- (value, microsecond) = value.split('.')
- if self.__legacy_microseconds__:
- microsecond = int(microsecond)
+ def _result_processor(self, fn, regexp):
+ def process(value):
+ if value is not None:
+ return fn(*[int(x or 0) for x in regexp.match(value).groups()])
else:
- microsecond = int((microsecond + '000000')[0:6])
- except ValueError:
- microsecond = 0
- return time.strptime(value, self.__format__)[0:6] + (microsecond,)
+ return None
+ return process
class SLDateTime(DateTimeMixin, sqltypes.DateTime):
- __format__ = "%Y-%m-%d %H:%M:%S"
- __microsecond__ = True
+ __legacy_microseconds__ = False
def get_col_spec(self):
return "TIMESTAMP"
+ def bind_processor(self, dialect):
+ if self.__legacy_microseconds__:
+ return self._bind_processor("%4.4d-%2.2d-%2.2d %2.2d:%2.2d:%2.2d.%s", (0, 7))
+ else:
+ return self._bind_processor("%4.4d-%2.2d-%2.2d %2.2d:%2.2d:%2.2d.%06d", (0, 7))
+
+ _reg = re.compile(r"(\d+)-(\d+)-(\d+)(?: (\d+):(\d+):(\d+)(?:\.(\d+))?)?")
def result_processor(self, dialect):
- def process(value):
- tup = self._cvt(value, dialect)
- return tup and datetime.datetime(*tup)
- return process
+ return self._result_processor(datetime.datetime, self._reg)
class SLDate(DateTimeMixin, sqltypes.Date):
- __format__ = "%Y-%m-%d"
- __microsecond__ = False
-
def get_col_spec(self):
return "DATE"
+ def bind_processor(self, dialect):
+ return self._bind_processor("%4.4d-%2.2d-%2.2d", (0, 3))
+
+ _reg = re.compile(r"(\d+)-(\d+)-(\d+)")
def result_processor(self, dialect):
- def process(value):
- tup = self._cvt(value, dialect)
- return tup and datetime.date(*tup[0:3])
- return process
+ return self._result_processor(datetime.date, self._reg)
class SLTime(DateTimeMixin, sqltypes.Time):
- __format__ = "%H:%M:%S"
- __microsecond__ = True
+ __legacy_microseconds__ = False
def get_col_spec(self):
return "TIME"
+ def bind_processor(self, dialect):
+ if self.__legacy_microseconds__:
+ return self._bind_processor("%2.2d:%2.2d:%2.2d.%s", (3, 7))
+ else:
+ return self._bind_processor("%2.2d:%2.2d:%2.2d.%06d", (3, 7))
+
+ _reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d+))?")
def result_processor(self, dialect):
- def process(value):
- tup = self._cvt(value, dialect)
- return tup and datetime.time(*tup[3:7])
- return process
+ return self._result_processor(datetime.time, self._reg)
class SLText(sqltypes.Text):
def get_col_spec(self):
if testing.against('oracle'):
import sqlalchemy.databases.oracle as oracle
insert_data = [
- [7, 'jack',
+ (7, 'jack',
datetime.datetime(2005, 11, 10, 0, 0),
datetime.date(2005,11,10),
- datetime.datetime(2005, 11, 10, 0, 0, 0, 29384)],
- [8, 'roy',
+ datetime.datetime(2005, 11, 10, 0, 0, 0, 29384)),
+ (8, 'roy',
datetime.datetime(2005, 11, 10, 11, 52, 35),
datetime.date(2005,10,10),
- datetime.datetime(2006, 5, 10, 15, 32, 47, 6754)],
- [9, 'foo',
+ datetime.datetime(2006, 5, 10, 15, 32, 47, 6754)),
+ (9, 'foo',
datetime.datetime(2006, 11, 10, 11, 52, 35),
datetime.date(1970,4,1),
- datetime.datetime(2004, 9, 18, 4, 0, 52, 1043)],
- [10, 'colber', None, None, None]
+ datetime.datetime(2004, 9, 18, 4, 0, 52, 1043)),
+ (10, 'colber', None, None, None),
]
fnames = ['user_id', 'user_name', 'user_datetime',
'user_date', 'user_time']
time_micro = 0
insert_data = [
- [7, 'jack',
+ (7, 'jack',
datetime.datetime(2005, 11, 10, 0, 0),
datetime.date(2005, 11, 10),
- datetime.time(12, 20, 2)],
- [8, 'roy',
+ datetime.time(12, 20, 2)),
+ (8, 'roy',
datetime.datetime(2005, 11, 10, 11, 52, 35),
datetime.date(2005, 10, 10),
- datetime.time(0, 0, 0)],
- [9, 'foo',
+ datetime.time(0, 0, 0)),
+ (9, 'foo',
datetime.datetime(2005, 11, 10, 11, 52, 35, datetime_micro),
datetime.date(1970, 4, 1),
- datetime.time(23, 59, 59, time_micro)],
- [10, 'colber', None, None, None]
+ datetime.time(23, 59, 59, time_micro)),
+ (10, 'colber', None, None, None),
]
+
+
fnames = ['user_id', 'user_name', 'user_datetime',
'user_date', 'user_time']
Column('user_date', Date),
Column('user_time', Time)]
+ if testing.against('sqlite', 'postgres'):
+ insert_data.append(
+ (11, 'historic',
+ datetime.datetime(1850, 11, 10, 11, 52, 35, datetime_micro),
+ datetime.date(1727,4,1),
+ None),
+ )
+
users_with_date = Table('query_users_with_date',
MetaData(testing.db), *collist)
users_with_date.create()
def testdate(self):
global insert_data
- l = map(list, users_with_date.select().execute().fetchall())
+ l = map(tuple, users_with_date.select().execute().fetchall())
self.assert_(l == insert_data,
'DateTest mismatch: got:%s expected:%s' % (l, insert_data))