]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Rick Morrison's patch adding Smallint, Date, and Time support !
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 4 Feb 2006 16:44:00 +0000 (16:44 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 4 Feb 2006 16:44:00 +0000 (16:44 +0000)
lib/sqlalchemy/databases/mysql.py
lib/sqlalchemy/databases/oracle.py
lib/sqlalchemy/databases/postgres.py
lib/sqlalchemy/databases/sqlite.py
lib/sqlalchemy/types.py
test/testbase.py
test/types.py

index fda41a3fc4ef47b4bf4917baf566e2e956a6e973..268639b690562ec57eeb36127e86c40b310e23c2 100644 (file)
@@ -28,9 +28,18 @@ class MSFloat(sqltypes.Float):
 class MSInteger(sqltypes.Integer):
     def get_col_spec(self):
         return "INTEGER"
+class MSSmallInteger(sqltypes.Smallinteger):
+    def get_col_spec(self):
+        return "SMALLINT"
 class MSDateTime(sqltypes.DateTime):
     def get_col_spec(self):
         return "DATETIME"
+class MSDate(sqltypes.Date):
+    def get_col_spec(self):
+        return "DATE"
+class MSTime(sqltypes.Time):
+    def get_col_spec(self):
+        return "TIME"
 class MSText(sqltypes.TEXT):
     def get_col_spec(self):
         return "TEXT"
@@ -54,9 +63,12 @@ class MSBoolean(sqltypes.Boolean):
         
 colspecs = {
     sqltypes.Integer : MSInteger,
+    sqltypes.Smallinteger : MSSmallInteger,
     sqltypes.Numeric : MSNumeric,
     sqltypes.Float : MSFloat,
     sqltypes.DateTime : MSDateTime,
+    sqltypes.Date : MSDate,
+    sqltypes.Time : MSTime,
     sqltypes.String : MSString,
     sqltypes.Binary : MSBinary,
     sqltypes.Boolean : MSBoolean,
@@ -66,6 +78,7 @@ colspecs = {
 
 ischema_names = {
     'int' : MSInteger,
+    'smallint' : MSSmallInteger,
     'varchar' : MSString,
     'char' : MSChar,
     'text' : MSText,
@@ -73,6 +86,8 @@ ischema_names = {
     'float' : MSFloat,
     'timestamp' : MSDateTime,
     'datetime' : MSDateTime,
+    'date' : MSDate,
+    'time' : MSTime,
     'binary' : MSBinary,
     'blob' : MSBinary,
 }
index c3214c669e6d4e2fe1be112ba0f175ef8068f7bb..de4a76c3eff81458ef7932ea998153368494f7b3 100644 (file)
@@ -24,9 +24,17 @@ class OracleNumeric(sqltypes.Numeric):
 class OracleInteger(sqltypes.Integer):
     def get_col_spec(self):
         return "INTEGER"
+class OracleSmallInteger(sqltypes.Smallinteger):
+    def get_col_spec(self):
+        return "SMALLINT"
 class OracleDateTime(sqltypes.DateTime):
     def get_col_spec(self):
         return "DATE"
+# Note:
+# Oracle DATE == DATETIME
+# Oracle does not allow milliseconds in DATE
+# Oracle does not support TIME columns
+
 class OracleText(sqltypes.TEXT):
     def get_col_spec(self):
         return "CLOB"
@@ -45,8 +53,10 @@ class OracleBoolean(sqltypes.Boolean):
         
 colspecs = {
     sqltypes.Integer : OracleInteger,
+    sqltypes.Smallinteger : OracleSmallInteger,
     sqltypes.Numeric : OracleNumeric,
     sqltypes.DateTime : OracleDateTime,
+    sqltypes.Date : OracleDateTime,
     sqltypes.String : OracleString,
     sqltypes.Binary : OracleBinary,
     sqltypes.Boolean : OracleBoolean,
index 3a29186842c09a677fc28d2dd84d3800804af9a2..2d6adc165bb9005a20453f98d88bd7d8cb033ca2 100644 (file)
@@ -33,6 +33,9 @@ class PGFloat(sqltypes.Float):
 class PGInteger(sqltypes.Integer):
     def get_col_spec(self):
         return "INTEGER"
+class PGSmallInteger(sqltypes.Smallinteger):
+    def get_col_spec(self):
+        return "SMALLINT"
 class PG2DateTime(sqltypes.DateTime):
     def get_col_spec(self):
         return "TIMESTAMP"
@@ -46,6 +49,32 @@ class PG1DateTime(sqltypes.DateTime):
         return value
     def get_col_spec(self):
         return "TIMESTAMP"
+class PG2Date(sqltypes.Date):
+    def get_col_spec(self):
+        return "DATE"
+class PG1Date(sqltypes.Date):
+    def convert_bind_param(self, value, engine):
+        # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
+        # this one doesnt seem to work with the "emulation" mode
+        return psycopg.DateFromMx(value)
+    def convert_result_value(self, value, engine):
+        # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
+        return value
+    def get_col_spec(self):
+        return "DATE"
+class PG2Time(sqltypes.Date):
+    def get_col_spec(self):
+        return "TIME"
+class PG1Time(sqltypes.Date):
+    def convert_bind_param(self, value, engine):
+        # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
+        # this one doesnt seem to work with the "emulation" mode
+        return psycopg.TimeFromMx(value)
+    def convert_result_value(self, value, engine):
+        # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
+        return value
+    def get_col_spec(self):
+        return "TIME"
 class PGText(sqltypes.TEXT):
     def get_col_spec(self):
         return "TEXT"
@@ -64,9 +93,12 @@ class PGBoolean(sqltypes.Boolean):
         
 pg2_colspecs = {
     sqltypes.Integer : PGInteger,
+    sqltypes.Smallinteger : PGSmallInteger,
     sqltypes.Numeric : PGNumeric,
     sqltypes.Float : PGFloat,
     sqltypes.DateTime : PG2DateTime,
+    sqltypes.Date : PG2Date,
+    sqltypes.Time : PG2Time,
     sqltypes.String : PGString,
     sqltypes.Binary : PGBinary,
     sqltypes.Boolean : PGBoolean,
@@ -74,11 +106,16 @@ pg2_colspecs = {
     sqltypes.CHAR: PGChar,
 }
 pg1_colspecs = pg2_colspecs.copy()
-pg1_colspecs[sqltypes.DateTime] = PG1DateTime
+pg1_colspecs.update({
+    sqltypes.DateTime :  PG1DateTime,
+    sqltypes.Date : PG1Date,
+    sqltypes.Time : PG1Time
+    })
 
 pg2_ischema_names = {
     'integer' : PGInteger,
     'bigint' : PGInteger,
+    'smallint' : PGSmallInteger,
     'character varying' : PGString,
     'character' : PGChar,
     'text' : PGText,
@@ -88,12 +125,18 @@ pg2_ischema_names = {
     'double precision' : PGFloat,
     'timestamp with time zone' : PG2DateTime,
     'timestamp without time zone' : PG2DateTime,
+    'date' : PG2Date,
+    'time': PG2Time,
     'bytea' : PGBinary,
     'boolean' : PGBoolean,
 }
 pg1_ischema_names = pg2_ischema_names.copy()
-pg1_ischema_names['timestamp with time zone'] = \
-    pg1_ischema_names['timestamp without time zone'] = PG1DateTime
+pg1_ischema_names.update({
+    'timestamp with time zone' : PG1DateTime,
+    'timestamp without time zone' : PG1DateTime,
+    'date' : PG1Date,
+    'time' : PG1Time
+    })
 
 def engine(opts, **params):
     return PGSQLEngine(opts, **params)
index 9ada952ca054c102afbcea35c440f1710464502b..a403e3c1a61a00433e264b0cd1f0fe1bd478e344 100644 (file)
@@ -15,6 +15,8 @@ import sqlalchemy.types as sqltypes
 from sqlalchemy.ansisql import *
 import datetime,time
 
+pysqlite2_timesupport = False   # Change this if the init.d guys ever get around to supporting time cols
+
 try:
     from pysqlite2 import dbapi2 as sqlite
 except:
@@ -26,10 +28,13 @@ class SLNumeric(sqltypes.Numeric):
 class SLInteger(sqltypes.Integer):
     def get_col_spec(self):
         return "INTEGER"
+class SLSmallInteger(sqltypes.Smallinteger):
+    def get_col_spec(self):
+        return "SMALLINT"
 class SLDateTime(sqltypes.DateTime):
     def get_col_spec(self):
         return "TIMESTAMP"
-    def convert_result_value(self, value, engine):
+    def _cvt(self, value, engine, fmt):
         if value is None:
             return None
         parts = value.split('.')
@@ -38,9 +43,22 @@ class SLDateTime(sqltypes.DateTime):
             microsecond = int(microsecond)
         except ValueError:
             (value, microsecond) = (value, 0)
-        tup = time.strptime(value, "%Y-%m-%d %H:%M:%S")
-        return datetime.datetime(microsecond=microsecond, *tup[0:6])
-
+        return time.strptime(value, fmt)[0:6] + (microsecond,)
+    def convert_result_value(self, value, engine):
+        tup = self._cvt(value, engine, "%Y-%m-%d %H:%M:%S")
+        return tup and datetime.datetime(*tup)
+class SLDate(SLDateTime):
+    def get_col_spec(self):
+        return "DATE"
+    def convert_result_value(self, value, engine):
+        tup = self._cvt(value, engine, "%Y-%m-%d")
+        return tup and datetime.date(*tup[0:3])
+class SLTime(SLDateTime):
+    def get_col_spec(self):
+        return "TIME"
+    def convert_result_value(self, value, engine):
+        tup = self._cvt(value, engine, "%H:%M:%S")
+        return tup and datetime.time(*tup[4:7])
 class SLText(sqltypes.TEXT):
     def get_col_spec(self):
         return "TEXT"
@@ -59,9 +77,11 @@ class SLBoolean(sqltypes.Boolean):
         
 colspecs = {
     sqltypes.Integer : SLInteger,
+    sqltypes.Smallinteger : SLSmallInteger,
     sqltypes.Numeric : SLNumeric,
     sqltypes.Float : SLNumeric,
     sqltypes.DateTime : SLDateTime,
+    sqltypes.Date : SLDate,
     sqltypes.String : SLString,
     sqltypes.Binary : SLBinary,
     sqltypes.Boolean : SLBoolean,
@@ -71,15 +91,22 @@ colspecs = {
 
 pragma_names = {
     'INTEGER' : SLInteger,
+    'SMALLINT' : SLSmallInteger,
     'VARCHAR' : SLString,
     'CHAR' : SLChar,
     'TEXT' : SLText,
     'NUMERIC' : SLNumeric,
     'FLOAT' : SLNumeric,
     'TIMESTAMP' : SLDateTime,
+    'DATETIME' : SLDateTime,
+    'DATE' : SLDate,
     'BLOB' : SLBinary,
 }
 
+if pysqlite2_timesupport:
+    colspecs.update({sqltypes.Time : SLTime})
+    pragma_names.update({'TIME' : SLTime})
+    
 def engine(opts, **params):
     return SQLiteSQLEngine(opts, **params)
 
index defd6819bb189ffbed91d72a2c6869ce68823667..f76455ebd323abb1ab1d00c5a04095f9dedfb7b8 100644 (file)
@@ -6,7 +6,8 @@
 
 __all__ = [ 'TypeEngine', 'TypeDecorator', 'NullTypeEngine',
             'INT', 'CHAR', 'VARCHAR', 'TEXT', 'FLOAT', 'DECIMAL', 
-            'TIMESTAMP', 'DATETIME', 'CLOB', 'BLOB', 'BOOLEAN', 'String', 'Integer', 'Numeric', 'Float', 'DateTime', 'Binary', 'Boolean', 'Unicode', 'NULLTYPE'
+            'TIMESTAMP', 'DATETIME', 'CLOB', 'BLOB', 'BOOLEAN', 'String', 'Integer', 'Smallinteger',
+            'Numeric', 'Float', 'DateTime', 'Date', 'Time', 'Binary', 'Boolean', 'Unicode', 'NULLTYPE'
             ]
 
 import sqlalchemy.util as util
@@ -88,6 +89,10 @@ class Integer(NullTypeEngine):
     # seems to be not needed with SQLite, Postgres
     pass
 
+class Smallinteger(Integer):
+    """ smallint datatype """
+    pass
+
 class Numeric(NullTypeEngine):
     def __init__(self, precision = 10, length = 2):
         self.precision = precision
@@ -104,6 +109,12 @@ class Float(NullTypeEngine):
 class DateTime(NullTypeEngine):
     pass
 
+class Date(NullTypeEngine):
+    pass
+
+class Time(NullTypeEngine):
+    pass
+
 class Binary(NullTypeEngine):
     def __init__(self, length=None):
         self.length = length
@@ -122,8 +133,11 @@ class TEXT(String):pass
 class DECIMAL(Numeric):pass
 class INT(Integer):pass
 INTEGER = INT
+class SMALLINT(Smallinteger):pass
 class TIMESTAMP(DateTime): pass
 class DATETIME(DateTime): pass
+class DATE(Date): pass
+class TIME(Time): pass
 class CLOB(String): pass
 class VARCHAR(String): pass
 class CHAR(String):pass
index b84c08e273a27049da79ebc9dd5c391b69a51f43..4802b1185571028bc5146663c81a67c65d8da56b 100644 (file)
@@ -7,6 +7,7 @@ import sqlalchemy.databases.postgres as postgres
 #import sqlalchemy.databases.mysql as mysql
 
 echo = True
+#echo = False
 #echo = 'debug'
 db = None
 db_uri = None
@@ -14,31 +15,32 @@ db_uri = None
 def parse_argv():
     # we are using the unittest main runner, so we are just popping out the 
     # arguments we need instead of using our own getopt type of thing
+    global db, db_uri
+    
+    DBTYPE = 'sqlite'
+
     if len(sys.argv) >= 3:
-        if sys.argv[1] == '--db':
+        if sys.argv[1] == '--dburi':
+            (param, db_uri) =  (sys.argv.pop(1), sys.argv.pop(1))
+        elif sys.argv[1] == '--db':
             (param, DBTYPE) = (sys.argv.pop(1), sys.argv.pop(1))
-    else:
-        DBTYPE = 'sqlite'
 
-    global db, db_uri
-    if DBTYPE == 'sqlite':
-        try:
+    if (None == db_uri):
+        if DBTYPE == 'sqlite':
             db_uri = 'sqlite://filename=:memory:'
-            db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
-        except:
-            raise "Could not create sqlite engine.  specify --db <sqlite|sqlite_file|postgres|mysql|oracle> to test runner."
-    elif DBTYPE == 'sqlite_file':
-        db_uri = 'sqlite://filename=querytest.db'
-        db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
-    elif DBTYPE == 'postgres':
-        db_uri = 'postgres://database=test&port=5432&host=127.0.0.1&user=scott&password=tiger'
-        db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
-    elif DBTYPE == 'mysql':
-        db_uri = 'mysql://db=test&host=127.0.0.1&user=scott&passwd=tiger'
-        db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
-    elif DBTYPE == 'oracle':
-        db_uri = 'oracle://user=scott&password=tiger'
+        elif DBTYPE == 'sqlite_file':
+            db_uri = 'sqlite://filename=querytest.db'
+        elif DBTYPE == 'postgres':
+            db_uri = 'postgres://database=test&port=5432&host=127.0.0.1&user=scott&password=tiger'
+        elif DBTYPE == 'mysql':
+            db_uri = 'mysql://db=test&host=127.0.0.1&user=scott&passwd=tiger'
+        elif DBTYPE == 'oracle':
+            db_uri = 'oracle://user=scott&password=tiger'
+    try:
         db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
+    except:
+        raise "Could not create engine.  specify --db <sqlite|sqlite_file|postgres|mysql|oracle> to test runner."
+        
     db = EngineAssert(db)
 
 class PersistTest(unittest.TestCase):
index 155c4ad3b4e10ec9daa79d26bf547afb3b72f37f..f7b30fd2c152a49e79d6d7a9678aadee12fa82c7 100644 (file)
@@ -5,8 +5,6 @@ import testbase
     
 db = testbase.db
 
-
-
 class OverrideTest(PersistTest):
 
     def testprocessing(self):
@@ -21,7 +19,8 @@ class OverrideTest(PersistTest):
                 return typeobj()
             def adapt_args(self):
                 return self
-                
+
+        global users
         users = Table('users', db, 
             Column('user_id', Integer, primary_key = True),
             Column('goofy', MyType, nullable = False)
@@ -41,11 +40,16 @@ class OverrideTest(PersistTest):
         print repr(l)
         self.assert_(l == [(2, u'BIND_INjackBIND_OUT'), (3, u'BIND_INlalaBIND_OUT'), (4, u'BIND_INfredBIND_OUT')])
 
+    def tearDownAll(self):
+        global users
+        users.drop()
+
 
 class ColumnsTest(AssertMixin):
 
     def testcolumns(self):
         expectedResults = { 'int_column': 'int_column INTEGER',
+                            'smallint_column': 'smallint_column SMALLINT',
                                    'varchar_column': 'varchar_column VARCHAR(20)',
                                    'numeric_column': 'numeric_column NUMERIC(12, 3)',
                                    'float_column': 'float_column NUMERIC(25, 2)'
@@ -57,6 +61,7 @@ class ColumnsTest(AssertMixin):
         print db.engine.__module__
         testTable = Table('testColumns', db,
             Column('int_column', Integer),
+            Column('smallint_column', Smallinteger),
             Column('varchar_column', String(20)),
             Column('numeric_column', Numeric(12,3)),
             Column('float_column', Float(25)),
@@ -97,39 +102,60 @@ class BinaryTest(AssertMixin):
         
 class DateTest(AssertMixin):
     def setUpAll(self):
-        global users_with_date
-        users_with_date = Table('query_users_with_date', db,
-        Column('user_id', INT, primary_key = True),
-        Column('user_name', VARCHAR(20)),
-        Column('user_date', DateTime),
-        redefine = True
-        )
+        global users_with_date, insert_data
+
+        insert_data =  [[7, 'jack', datetime.datetime(2005, 11, 10, 0, 0), datetime.date(2005,11,10), datetime.time(12,20,2)],
+                        [8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35), datetime.date(2005,10,10), datetime.time(0,0,0)],
+                        [9, 'foo', datetime.datetime(2005, 11, 10, 11, 52, 35, 54839), datetime.date(1970,4,1), datetime.time(23,59,59,999)],
+                        [10, 'colber', None, None, None]]
+
+        fnames = ['user_id', 'user_name', 'user_datetime', 'user_date', 'user_time']
+
+        collist = [Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), Column('user_datetime', DateTime),
+                   Column('user_date', Date), Column('user_time', Time)]
+
+
+        
+        if db.engine.__module__.endswith('mysql'):
+            # strip microseconds -- not supported by this engine (should be an easier way to detect this)
+            for d in insert_data:
+                d[2] = d[2].replace(microsecond=0)
+                d[4] = d[4].replace(microsecond=0)
+        
+        try:
+            db.type_descriptor(types.TIME).get_col_spec()
+            print  "HI"
+        except:
+            # don't test TIME type -- not supported by this engine
+            insert_data = [d[:-1] for d in insert_data]
+            fnames = fnames[:-1]
+            collist = collist[:-1]
+
+
+        users_with_date = Table('query_users_with_date', db, redefine = True, *collist)
         users_with_date.create()
-        users_with_date.insert().execute(user_id = 7, user_name = 'jack', user_date=datetime.datetime(2005,11,10))
-        users_with_date.insert().execute(user_id = 8, user_name = 'roy', user_date=datetime.datetime(2005,11,10, 11,52,35))
-        users_with_date.insert().execute(user_id = 9, user_name = 'foo', user_date=datetime.datetime(2005,11,10, 11,52,35, 54839))
-        users_with_date.insert().execute(user_id = 10, user_name = 'colber', user_date=None)
+
+        insert_dicts = [dict(zip(fnames, d)) for d in insert_data]
+        for idict in insert_dicts:
+            users_with_date.insert().execute(**idict) # insert the data
+
     def tearDownAll(self):
         users_with_date.drop()
 
     def testdate(self):
-        l = users_with_date.select().execute().fetchall()
-        l = [[c for c in r] for r in l]
-        if db.engine.__module__.endswith('mysql'):
-            x = [[7, 'jack', datetime.datetime(2005, 11, 10, 0, 0)], [8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35)], [9, 'foo', datetime.datetime(2005, 11, 10, 11, 52, 35)], [10, 'colber', None]]
-        else:
-            x = [[7, 'jack', datetime.datetime(2005, 11, 10, 0, 0)], [8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35)], [9, 'foo', datetime.datetime(2005, 11, 10, 11, 52, 35, 54839)], [10, 'colber', None]]
-        print repr(l)
-        print repr(x)
-        self.assert_(l == x)
+        global insert_data
+
+        l = map(list, users_with_date.select().execute().fetchall())
+        self.assert_(l == insert_data, 'DateTest mismatch: got:%s expected:%s' % (l, insert_data))
+
 
     def testtextdate(self):     
-        x = db.text("select user_date from query_users_with_date", typemap={'user_date':DateTime}).execute().fetchall()
+        x = db.text("select user_datetime from query_users_with_date", typemap={'user_datetime':DateTime}).execute().fetchall()
         
         print repr(x)
         self.assert_(isinstance(x[0][0], datetime.datetime))
         
-        #x = db.text("select * from query_users_with_date where user_date=:date", bindparams=[bindparam('date', )]).execute(date=datetime.datetime(2005, 11, 10, 11, 52, 35)).fetchall()
+        #x = db.text("select * from query_users_with_date where user_datetime=:date", bindparams=[bindparam('date', )]).execute(date=datetime.datetime(2005, 11, 10, 11, 52, 35)).fetchall()
         #print repr(x)
         
 if __name__ == "__main__":