From e416129179d58879ac1d3facc48a4c640e78c05d Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sat, 4 Feb 2006 16:44:00 +0000 Subject: [PATCH] Rick Morrison's patch adding Smallint, Date, and Time support ! --- lib/sqlalchemy/databases/mysql.py | 15 ++++++ lib/sqlalchemy/databases/oracle.py | 10 ++++ lib/sqlalchemy/databases/postgres.py | 49 ++++++++++++++++-- lib/sqlalchemy/databases/sqlite.py | 35 +++++++++++-- lib/sqlalchemy/types.py | 16 +++++- test/testbase.py | 42 +++++++-------- test/types.py | 76 +++++++++++++++++++--------- 7 files changed, 190 insertions(+), 53 deletions(-) diff --git a/lib/sqlalchemy/databases/mysql.py b/lib/sqlalchemy/databases/mysql.py index fda41a3fc4..268639b690 100644 --- a/lib/sqlalchemy/databases/mysql.py +++ b/lib/sqlalchemy/databases/mysql.py @@ -28,9 +28,18 @@ class MSFloat(sqltypes.Float): class MSInteger(sqltypes.Integer): def get_col_spec(self): return "INTEGER" +class MSSmallInteger(sqltypes.Smallinteger): + def get_col_spec(self): + return "SMALLINT" class MSDateTime(sqltypes.DateTime): def get_col_spec(self): return "DATETIME" +class MSDate(sqltypes.Date): + def get_col_spec(self): + return "DATE" +class MSTime(sqltypes.Time): + def get_col_spec(self): + return "TIME" class MSText(sqltypes.TEXT): def get_col_spec(self): return "TEXT" @@ -54,9 +63,12 @@ class MSBoolean(sqltypes.Boolean): colspecs = { sqltypes.Integer : MSInteger, + sqltypes.Smallinteger : MSSmallInteger, sqltypes.Numeric : MSNumeric, sqltypes.Float : MSFloat, sqltypes.DateTime : MSDateTime, + sqltypes.Date : MSDate, + sqltypes.Time : MSTime, sqltypes.String : MSString, sqltypes.Binary : MSBinary, sqltypes.Boolean : MSBoolean, @@ -66,6 +78,7 @@ colspecs = { ischema_names = { 'int' : MSInteger, + 'smallint' : MSSmallInteger, 'varchar' : MSString, 'char' : MSChar, 'text' : MSText, @@ -73,6 +86,8 @@ ischema_names = { 'float' : MSFloat, 'timestamp' : MSDateTime, 'datetime' : MSDateTime, + 'date' : MSDate, + 'time' : MSTime, 'binary' : MSBinary, 'blob' : MSBinary, } diff --git a/lib/sqlalchemy/databases/oracle.py b/lib/sqlalchemy/databases/oracle.py index c3214c669e..de4a76c3ef 100644 --- a/lib/sqlalchemy/databases/oracle.py +++ b/lib/sqlalchemy/databases/oracle.py @@ -24,9 +24,17 @@ class OracleNumeric(sqltypes.Numeric): class OracleInteger(sqltypes.Integer): def get_col_spec(self): return "INTEGER" +class OracleSmallInteger(sqltypes.Smallinteger): + def get_col_spec(self): + return "SMALLINT" class OracleDateTime(sqltypes.DateTime): def get_col_spec(self): return "DATE" +# Note: +# Oracle DATE == DATETIME +# Oracle does not allow milliseconds in DATE +# Oracle does not support TIME columns + class OracleText(sqltypes.TEXT): def get_col_spec(self): return "CLOB" @@ -45,8 +53,10 @@ class OracleBoolean(sqltypes.Boolean): colspecs = { sqltypes.Integer : OracleInteger, + sqltypes.Smallinteger : OracleSmallInteger, sqltypes.Numeric : OracleNumeric, sqltypes.DateTime : OracleDateTime, + sqltypes.Date : OracleDateTime, sqltypes.String : OracleString, sqltypes.Binary : OracleBinary, sqltypes.Boolean : OracleBoolean, diff --git a/lib/sqlalchemy/databases/postgres.py b/lib/sqlalchemy/databases/postgres.py index 3a29186842..2d6adc165b 100644 --- a/lib/sqlalchemy/databases/postgres.py +++ b/lib/sqlalchemy/databases/postgres.py @@ -33,6 +33,9 @@ class PGFloat(sqltypes.Float): class PGInteger(sqltypes.Integer): def get_col_spec(self): return "INTEGER" +class PGSmallInteger(sqltypes.Smallinteger): + def get_col_spec(self): + return "SMALLINT" class PG2DateTime(sqltypes.DateTime): def get_col_spec(self): return "TIMESTAMP" @@ -46,6 +49,32 @@ class PG1DateTime(sqltypes.DateTime): return value def get_col_spec(self): return "TIMESTAMP" +class PG2Date(sqltypes.Date): + def get_col_spec(self): + return "DATE" +class PG1Date(sqltypes.Date): + def convert_bind_param(self, value, engine): + # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime + # this one doesnt seem to work with the "emulation" mode + return psycopg.DateFromMx(value) + def convert_result_value(self, value, engine): + # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime + return value + def get_col_spec(self): + return "DATE" +class PG2Time(sqltypes.Date): + def get_col_spec(self): + return "TIME" +class PG1Time(sqltypes.Date): + def convert_bind_param(self, value, engine): + # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime + # this one doesnt seem to work with the "emulation" mode + return psycopg.TimeFromMx(value) + def convert_result_value(self, value, engine): + # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime + return value + def get_col_spec(self): + return "TIME" class PGText(sqltypes.TEXT): def get_col_spec(self): return "TEXT" @@ -64,9 +93,12 @@ class PGBoolean(sqltypes.Boolean): pg2_colspecs = { sqltypes.Integer : PGInteger, + sqltypes.Smallinteger : PGSmallInteger, sqltypes.Numeric : PGNumeric, sqltypes.Float : PGFloat, sqltypes.DateTime : PG2DateTime, + sqltypes.Date : PG2Date, + sqltypes.Time : PG2Time, sqltypes.String : PGString, sqltypes.Binary : PGBinary, sqltypes.Boolean : PGBoolean, @@ -74,11 +106,16 @@ pg2_colspecs = { sqltypes.CHAR: PGChar, } pg1_colspecs = pg2_colspecs.copy() -pg1_colspecs[sqltypes.DateTime] = PG1DateTime +pg1_colspecs.update({ + sqltypes.DateTime : PG1DateTime, + sqltypes.Date : PG1Date, + sqltypes.Time : PG1Time + }) pg2_ischema_names = { 'integer' : PGInteger, 'bigint' : PGInteger, + 'smallint' : PGSmallInteger, 'character varying' : PGString, 'character' : PGChar, 'text' : PGText, @@ -88,12 +125,18 @@ pg2_ischema_names = { 'double precision' : PGFloat, 'timestamp with time zone' : PG2DateTime, 'timestamp without time zone' : PG2DateTime, + 'date' : PG2Date, + 'time': PG2Time, 'bytea' : PGBinary, 'boolean' : PGBoolean, } pg1_ischema_names = pg2_ischema_names.copy() -pg1_ischema_names['timestamp with time zone'] = \ - pg1_ischema_names['timestamp without time zone'] = PG1DateTime +pg1_ischema_names.update({ + 'timestamp with time zone' : PG1DateTime, + 'timestamp without time zone' : PG1DateTime, + 'date' : PG1Date, + 'time' : PG1Time + }) def engine(opts, **params): return PGSQLEngine(opts, **params) diff --git a/lib/sqlalchemy/databases/sqlite.py b/lib/sqlalchemy/databases/sqlite.py index 9ada952ca0..a403e3c1a6 100644 --- a/lib/sqlalchemy/databases/sqlite.py +++ b/lib/sqlalchemy/databases/sqlite.py @@ -15,6 +15,8 @@ import sqlalchemy.types as sqltypes from sqlalchemy.ansisql import * import datetime,time +pysqlite2_timesupport = False # Change this if the init.d guys ever get around to supporting time cols + try: from pysqlite2 import dbapi2 as sqlite except: @@ -26,10 +28,13 @@ class SLNumeric(sqltypes.Numeric): class SLInteger(sqltypes.Integer): def get_col_spec(self): return "INTEGER" +class SLSmallInteger(sqltypes.Smallinteger): + def get_col_spec(self): + return "SMALLINT" class SLDateTime(sqltypes.DateTime): def get_col_spec(self): return "TIMESTAMP" - def convert_result_value(self, value, engine): + def _cvt(self, value, engine, fmt): if value is None: return None parts = value.split('.') @@ -38,9 +43,22 @@ class SLDateTime(sqltypes.DateTime): microsecond = int(microsecond) except ValueError: (value, microsecond) = (value, 0) - tup = time.strptime(value, "%Y-%m-%d %H:%M:%S") - return datetime.datetime(microsecond=microsecond, *tup[0:6]) - + return time.strptime(value, fmt)[0:6] + (microsecond,) + def convert_result_value(self, value, engine): + tup = self._cvt(value, engine, "%Y-%m-%d %H:%M:%S") + return tup and datetime.datetime(*tup) +class SLDate(SLDateTime): + def get_col_spec(self): + return "DATE" + def convert_result_value(self, value, engine): + tup = self._cvt(value, engine, "%Y-%m-%d") + return tup and datetime.date(*tup[0:3]) +class SLTime(SLDateTime): + def get_col_spec(self): + return "TIME" + def convert_result_value(self, value, engine): + tup = self._cvt(value, engine, "%H:%M:%S") + return tup and datetime.time(*tup[4:7]) class SLText(sqltypes.TEXT): def get_col_spec(self): return "TEXT" @@ -59,9 +77,11 @@ class SLBoolean(sqltypes.Boolean): colspecs = { sqltypes.Integer : SLInteger, + sqltypes.Smallinteger : SLSmallInteger, sqltypes.Numeric : SLNumeric, sqltypes.Float : SLNumeric, sqltypes.DateTime : SLDateTime, + sqltypes.Date : SLDate, sqltypes.String : SLString, sqltypes.Binary : SLBinary, sqltypes.Boolean : SLBoolean, @@ -71,15 +91,22 @@ colspecs = { pragma_names = { 'INTEGER' : SLInteger, + 'SMALLINT' : SLSmallInteger, 'VARCHAR' : SLString, 'CHAR' : SLChar, 'TEXT' : SLText, 'NUMERIC' : SLNumeric, 'FLOAT' : SLNumeric, 'TIMESTAMP' : SLDateTime, + 'DATETIME' : SLDateTime, + 'DATE' : SLDate, 'BLOB' : SLBinary, } +if pysqlite2_timesupport: + colspecs.update({sqltypes.Time : SLTime}) + pragma_names.update({'TIME' : SLTime}) + def engine(opts, **params): return SQLiteSQLEngine(opts, **params) diff --git a/lib/sqlalchemy/types.py b/lib/sqlalchemy/types.py index defd6819bb..f76455ebd3 100644 --- a/lib/sqlalchemy/types.py +++ b/lib/sqlalchemy/types.py @@ -6,7 +6,8 @@ __all__ = [ 'TypeEngine', 'TypeDecorator', 'NullTypeEngine', 'INT', 'CHAR', 'VARCHAR', 'TEXT', 'FLOAT', 'DECIMAL', - 'TIMESTAMP', 'DATETIME', 'CLOB', 'BLOB', 'BOOLEAN', 'String', 'Integer', 'Numeric', 'Float', 'DateTime', 'Binary', 'Boolean', 'Unicode', 'NULLTYPE' + 'TIMESTAMP', 'DATETIME', 'CLOB', 'BLOB', 'BOOLEAN', 'String', 'Integer', 'Smallinteger', + 'Numeric', 'Float', 'DateTime', 'Date', 'Time', 'Binary', 'Boolean', 'Unicode', 'NULLTYPE' ] import sqlalchemy.util as util @@ -88,6 +89,10 @@ class Integer(NullTypeEngine): # seems to be not needed with SQLite, Postgres pass +class Smallinteger(Integer): + """ smallint datatype """ + pass + class Numeric(NullTypeEngine): def __init__(self, precision = 10, length = 2): self.precision = precision @@ -104,6 +109,12 @@ class Float(NullTypeEngine): class DateTime(NullTypeEngine): pass +class Date(NullTypeEngine): + pass + +class Time(NullTypeEngine): + pass + class Binary(NullTypeEngine): def __init__(self, length=None): self.length = length @@ -122,8 +133,11 @@ class TEXT(String):pass class DECIMAL(Numeric):pass class INT(Integer):pass INTEGER = INT +class SMALLINT(Smallinteger):pass class TIMESTAMP(DateTime): pass class DATETIME(DateTime): pass +class DATE(Date): pass +class TIME(Time): pass class CLOB(String): pass class VARCHAR(String): pass class CHAR(String):pass diff --git a/test/testbase.py b/test/testbase.py index b84c08e273..4802b11855 100644 --- a/test/testbase.py +++ b/test/testbase.py @@ -7,6 +7,7 @@ import sqlalchemy.databases.postgres as postgres #import sqlalchemy.databases.mysql as mysql echo = True +#echo = False #echo = 'debug' db = None db_uri = None @@ -14,31 +15,32 @@ db_uri = None def parse_argv(): # we are using the unittest main runner, so we are just popping out the # arguments we need instead of using our own getopt type of thing + global db, db_uri + + DBTYPE = 'sqlite' + if len(sys.argv) >= 3: - if sys.argv[1] == '--db': + if sys.argv[1] == '--dburi': + (param, db_uri) = (sys.argv.pop(1), sys.argv.pop(1)) + elif sys.argv[1] == '--db': (param, DBTYPE) = (sys.argv.pop(1), sys.argv.pop(1)) - else: - DBTYPE = 'sqlite' - global db, db_uri - if DBTYPE == 'sqlite': - try: + if (None == db_uri): + if DBTYPE == 'sqlite': db_uri = 'sqlite://filename=:memory:' - db = engine.create_engine(db_uri, echo=echo, default_ordering=True) - except: - raise "Could not create sqlite engine. specify --db to test runner." - elif DBTYPE == 'sqlite_file': - db_uri = 'sqlite://filename=querytest.db' - db = engine.create_engine(db_uri, echo=echo, default_ordering=True) - elif DBTYPE == 'postgres': - db_uri = 'postgres://database=test&port=5432&host=127.0.0.1&user=scott&password=tiger' - db = engine.create_engine(db_uri, echo=echo, default_ordering=True) - elif DBTYPE == 'mysql': - db_uri = 'mysql://db=test&host=127.0.0.1&user=scott&passwd=tiger' - db = engine.create_engine(db_uri, echo=echo, default_ordering=True) - elif DBTYPE == 'oracle': - db_uri = 'oracle://user=scott&password=tiger' + elif DBTYPE == 'sqlite_file': + db_uri = 'sqlite://filename=querytest.db' + elif DBTYPE == 'postgres': + db_uri = 'postgres://database=test&port=5432&host=127.0.0.1&user=scott&password=tiger' + elif DBTYPE == 'mysql': + db_uri = 'mysql://db=test&host=127.0.0.1&user=scott&passwd=tiger' + elif DBTYPE == 'oracle': + db_uri = 'oracle://user=scott&password=tiger' + try: db = engine.create_engine(db_uri, echo=echo, default_ordering=True) + except: + raise "Could not create engine. specify --db to test runner." + db = EngineAssert(db) class PersistTest(unittest.TestCase): diff --git a/test/types.py b/test/types.py index 155c4ad3b4..f7b30fd2c1 100644 --- a/test/types.py +++ b/test/types.py @@ -5,8 +5,6 @@ import testbase db = testbase.db - - class OverrideTest(PersistTest): def testprocessing(self): @@ -21,7 +19,8 @@ class OverrideTest(PersistTest): return typeobj() def adapt_args(self): return self - + + global users users = Table('users', db, Column('user_id', Integer, primary_key = True), Column('goofy', MyType, nullable = False) @@ -41,11 +40,16 @@ class OverrideTest(PersistTest): print repr(l) self.assert_(l == [(2, u'BIND_INjackBIND_OUT'), (3, u'BIND_INlalaBIND_OUT'), (4, u'BIND_INfredBIND_OUT')]) + def tearDownAll(self): + global users + users.drop() + class ColumnsTest(AssertMixin): def testcolumns(self): expectedResults = { 'int_column': 'int_column INTEGER', + 'smallint_column': 'smallint_column SMALLINT', 'varchar_column': 'varchar_column VARCHAR(20)', 'numeric_column': 'numeric_column NUMERIC(12, 3)', 'float_column': 'float_column NUMERIC(25, 2)' @@ -57,6 +61,7 @@ class ColumnsTest(AssertMixin): print db.engine.__module__ testTable = Table('testColumns', db, Column('int_column', Integer), + Column('smallint_column', Smallinteger), Column('varchar_column', String(20)), Column('numeric_column', Numeric(12,3)), Column('float_column', Float(25)), @@ -97,39 +102,60 @@ class BinaryTest(AssertMixin): class DateTest(AssertMixin): def setUpAll(self): - global users_with_date - users_with_date = Table('query_users_with_date', db, - Column('user_id', INT, primary_key = True), - Column('user_name', VARCHAR(20)), - Column('user_date', DateTime), - redefine = True - ) + global users_with_date, insert_data + + insert_data = [[7, 'jack', datetime.datetime(2005, 11, 10, 0, 0), datetime.date(2005,11,10), datetime.time(12,20,2)], + [8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35), datetime.date(2005,10,10), datetime.time(0,0,0)], + [9, 'foo', datetime.datetime(2005, 11, 10, 11, 52, 35, 54839), datetime.date(1970,4,1), datetime.time(23,59,59,999)], + [10, 'colber', None, None, None]] + + fnames = ['user_id', 'user_name', 'user_datetime', 'user_date', 'user_time'] + + collist = [Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), Column('user_datetime', DateTime), + Column('user_date', Date), Column('user_time', Time)] + + + + if db.engine.__module__.endswith('mysql'): + # strip microseconds -- not supported by this engine (should be an easier way to detect this) + for d in insert_data: + d[2] = d[2].replace(microsecond=0) + d[4] = d[4].replace(microsecond=0) + + try: + db.type_descriptor(types.TIME).get_col_spec() + print "HI" + except: + # don't test TIME type -- not supported by this engine + insert_data = [d[:-1] for d in insert_data] + fnames = fnames[:-1] + collist = collist[:-1] + + + users_with_date = Table('query_users_with_date', db, redefine = True, *collist) users_with_date.create() - users_with_date.insert().execute(user_id = 7, user_name = 'jack', user_date=datetime.datetime(2005,11,10)) - users_with_date.insert().execute(user_id = 8, user_name = 'roy', user_date=datetime.datetime(2005,11,10, 11,52,35)) - users_with_date.insert().execute(user_id = 9, user_name = 'foo', user_date=datetime.datetime(2005,11,10, 11,52,35, 54839)) - users_with_date.insert().execute(user_id = 10, user_name = 'colber', user_date=None) + + insert_dicts = [dict(zip(fnames, d)) for d in insert_data] + for idict in insert_dicts: + users_with_date.insert().execute(**idict) # insert the data + def tearDownAll(self): users_with_date.drop() def testdate(self): - l = users_with_date.select().execute().fetchall() - l = [[c for c in r] for r in l] - if db.engine.__module__.endswith('mysql'): - x = [[7, 'jack', datetime.datetime(2005, 11, 10, 0, 0)], [8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35)], [9, 'foo', datetime.datetime(2005, 11, 10, 11, 52, 35)], [10, 'colber', None]] - else: - x = [[7, 'jack', datetime.datetime(2005, 11, 10, 0, 0)], [8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35)], [9, 'foo', datetime.datetime(2005, 11, 10, 11, 52, 35, 54839)], [10, 'colber', None]] - print repr(l) - print repr(x) - self.assert_(l == x) + global insert_data + + l = map(list, users_with_date.select().execute().fetchall()) + self.assert_(l == insert_data, 'DateTest mismatch: got:%s expected:%s' % (l, insert_data)) + def testtextdate(self): - x = db.text("select user_date from query_users_with_date", typemap={'user_date':DateTime}).execute().fetchall() + x = db.text("select user_datetime from query_users_with_date", typemap={'user_datetime':DateTime}).execute().fetchall() print repr(x) self.assert_(isinstance(x[0][0], datetime.datetime)) - #x = db.text("select * from query_users_with_date where user_date=:date", bindparams=[bindparam('date', )]).execute(date=datetime.datetime(2005, 11, 10, 11, 52, 35)).fetchall() + #x = db.text("select * from query_users_with_date where user_datetime=:date", bindparams=[bindparam('date', )]).execute(date=datetime.datetime(2005, 11, 10, 11, 52, 35)).fetchall() #print repr(x) if __name__ == "__main__": -- 2.47.2