class MSInteger(sqltypes.Integer):
def get_col_spec(self):
return "INTEGER"
+class MSSmallInteger(sqltypes.Smallinteger):
+ def get_col_spec(self):
+ return "SMALLINT"
class MSDateTime(sqltypes.DateTime):
def get_col_spec(self):
return "DATETIME"
+class MSDate(sqltypes.Date):
+ def get_col_spec(self):
+ return "DATE"
+class MSTime(sqltypes.Time):
+ def get_col_spec(self):
+ return "TIME"
class MSText(sqltypes.TEXT):
def get_col_spec(self):
return "TEXT"
colspecs = {
sqltypes.Integer : MSInteger,
+ sqltypes.Smallinteger : MSSmallInteger,
sqltypes.Numeric : MSNumeric,
sqltypes.Float : MSFloat,
sqltypes.DateTime : MSDateTime,
+ sqltypes.Date : MSDate,
+ sqltypes.Time : MSTime,
sqltypes.String : MSString,
sqltypes.Binary : MSBinary,
sqltypes.Boolean : MSBoolean,
ischema_names = {
'int' : MSInteger,
+ 'smallint' : MSSmallInteger,
'varchar' : MSString,
'char' : MSChar,
'text' : MSText,
'float' : MSFloat,
'timestamp' : MSDateTime,
'datetime' : MSDateTime,
+ 'date' : MSDate,
+ 'time' : MSTime,
'binary' : MSBinary,
'blob' : MSBinary,
}
class OracleInteger(sqltypes.Integer):
def get_col_spec(self):
return "INTEGER"
+class OracleSmallInteger(sqltypes.Smallinteger):
+ def get_col_spec(self):
+ return "SMALLINT"
class OracleDateTime(sqltypes.DateTime):
def get_col_spec(self):
return "DATE"
+# Note:
+# Oracle DATE == DATETIME
+# Oracle does not allow milliseconds in DATE
+# Oracle does not support TIME columns
+
class OracleText(sqltypes.TEXT):
def get_col_spec(self):
return "CLOB"
colspecs = {
sqltypes.Integer : OracleInteger,
+ sqltypes.Smallinteger : OracleSmallInteger,
sqltypes.Numeric : OracleNumeric,
sqltypes.DateTime : OracleDateTime,
+ sqltypes.Date : OracleDateTime,
sqltypes.String : OracleString,
sqltypes.Binary : OracleBinary,
sqltypes.Boolean : OracleBoolean,
class PGInteger(sqltypes.Integer):
def get_col_spec(self):
return "INTEGER"
+class PGSmallInteger(sqltypes.Smallinteger):
+ def get_col_spec(self):
+ return "SMALLINT"
class PG2DateTime(sqltypes.DateTime):
def get_col_spec(self):
return "TIMESTAMP"
return value
def get_col_spec(self):
return "TIMESTAMP"
+class PG2Date(sqltypes.Date):
+ def get_col_spec(self):
+ return "DATE"
+class PG1Date(sqltypes.Date):
+ def convert_bind_param(self, value, engine):
+ # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
+ # this one doesnt seem to work with the "emulation" mode
+ return psycopg.DateFromMx(value)
+ def convert_result_value(self, value, engine):
+ # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
+ return value
+ def get_col_spec(self):
+ return "DATE"
+class PG2Time(sqltypes.Date):
+ def get_col_spec(self):
+ return "TIME"
+class PG1Time(sqltypes.Date):
+ def convert_bind_param(self, value, engine):
+ # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
+ # this one doesnt seem to work with the "emulation" mode
+ return psycopg.TimeFromMx(value)
+ def convert_result_value(self, value, engine):
+ # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
+ return value
+ def get_col_spec(self):
+ return "TIME"
class PGText(sqltypes.TEXT):
def get_col_spec(self):
return "TEXT"
pg2_colspecs = {
sqltypes.Integer : PGInteger,
+ sqltypes.Smallinteger : PGSmallInteger,
sqltypes.Numeric : PGNumeric,
sqltypes.Float : PGFloat,
sqltypes.DateTime : PG2DateTime,
+ sqltypes.Date : PG2Date,
+ sqltypes.Time : PG2Time,
sqltypes.String : PGString,
sqltypes.Binary : PGBinary,
sqltypes.Boolean : PGBoolean,
sqltypes.CHAR: PGChar,
}
pg1_colspecs = pg2_colspecs.copy()
-pg1_colspecs[sqltypes.DateTime] = PG1DateTime
+pg1_colspecs.update({
+ sqltypes.DateTime : PG1DateTime,
+ sqltypes.Date : PG1Date,
+ sqltypes.Time : PG1Time
+ })
pg2_ischema_names = {
'integer' : PGInteger,
'bigint' : PGInteger,
+ 'smallint' : PGSmallInteger,
'character varying' : PGString,
'character' : PGChar,
'text' : PGText,
'double precision' : PGFloat,
'timestamp with time zone' : PG2DateTime,
'timestamp without time zone' : PG2DateTime,
+ 'date' : PG2Date,
+ 'time': PG2Time,
'bytea' : PGBinary,
'boolean' : PGBoolean,
}
pg1_ischema_names = pg2_ischema_names.copy()
-pg1_ischema_names['timestamp with time zone'] = \
- pg1_ischema_names['timestamp without time zone'] = PG1DateTime
+pg1_ischema_names.update({
+ 'timestamp with time zone' : PG1DateTime,
+ 'timestamp without time zone' : PG1DateTime,
+ 'date' : PG1Date,
+ 'time' : PG1Time
+ })
def engine(opts, **params):
return PGSQLEngine(opts, **params)
from sqlalchemy.ansisql import *
import datetime,time
+pysqlite2_timesupport = False # Change this if the init.d guys ever get around to supporting time cols
+
try:
from pysqlite2 import dbapi2 as sqlite
except:
class SLInteger(sqltypes.Integer):
def get_col_spec(self):
return "INTEGER"
+class SLSmallInteger(sqltypes.Smallinteger):
+ def get_col_spec(self):
+ return "SMALLINT"
class SLDateTime(sqltypes.DateTime):
def get_col_spec(self):
return "TIMESTAMP"
- def convert_result_value(self, value, engine):
+ def _cvt(self, value, engine, fmt):
if value is None:
return None
parts = value.split('.')
microsecond = int(microsecond)
except ValueError:
(value, microsecond) = (value, 0)
- tup = time.strptime(value, "%Y-%m-%d %H:%M:%S")
- return datetime.datetime(microsecond=microsecond, *tup[0:6])
-
+ return time.strptime(value, fmt)[0:6] + (microsecond,)
+ def convert_result_value(self, value, engine):
+ tup = self._cvt(value, engine, "%Y-%m-%d %H:%M:%S")
+ return tup and datetime.datetime(*tup)
+class SLDate(SLDateTime):
+ def get_col_spec(self):
+ return "DATE"
+ def convert_result_value(self, value, engine):
+ tup = self._cvt(value, engine, "%Y-%m-%d")
+ return tup and datetime.date(*tup[0:3])
+class SLTime(SLDateTime):
+ def get_col_spec(self):
+ return "TIME"
+ def convert_result_value(self, value, engine):
+ tup = self._cvt(value, engine, "%H:%M:%S")
+ return tup and datetime.time(*tup[4:7])
class SLText(sqltypes.TEXT):
def get_col_spec(self):
return "TEXT"
colspecs = {
sqltypes.Integer : SLInteger,
+ sqltypes.Smallinteger : SLSmallInteger,
sqltypes.Numeric : SLNumeric,
sqltypes.Float : SLNumeric,
sqltypes.DateTime : SLDateTime,
+ sqltypes.Date : SLDate,
sqltypes.String : SLString,
sqltypes.Binary : SLBinary,
sqltypes.Boolean : SLBoolean,
pragma_names = {
'INTEGER' : SLInteger,
+ 'SMALLINT' : SLSmallInteger,
'VARCHAR' : SLString,
'CHAR' : SLChar,
'TEXT' : SLText,
'NUMERIC' : SLNumeric,
'FLOAT' : SLNumeric,
'TIMESTAMP' : SLDateTime,
+ 'DATETIME' : SLDateTime,
+ 'DATE' : SLDate,
'BLOB' : SLBinary,
}
+if pysqlite2_timesupport:
+ colspecs.update({sqltypes.Time : SLTime})
+ pragma_names.update({'TIME' : SLTime})
+
def engine(opts, **params):
return SQLiteSQLEngine(opts, **params)
__all__ = [ 'TypeEngine', 'TypeDecorator', 'NullTypeEngine',
'INT', 'CHAR', 'VARCHAR', 'TEXT', 'FLOAT', 'DECIMAL',
- 'TIMESTAMP', 'DATETIME', 'CLOB', 'BLOB', 'BOOLEAN', 'String', 'Integer', 'Numeric', 'Float', 'DateTime', 'Binary', 'Boolean', 'Unicode', 'NULLTYPE'
+ 'TIMESTAMP', 'DATETIME', 'CLOB', 'BLOB', 'BOOLEAN', 'String', 'Integer', 'Smallinteger',
+ 'Numeric', 'Float', 'DateTime', 'Date', 'Time', 'Binary', 'Boolean', 'Unicode', 'NULLTYPE'
]
import sqlalchemy.util as util
# seems to be not needed with SQLite, Postgres
pass
+class Smallinteger(Integer):
+ """ smallint datatype """
+ pass
+
class Numeric(NullTypeEngine):
def __init__(self, precision = 10, length = 2):
self.precision = precision
class DateTime(NullTypeEngine):
pass
+class Date(NullTypeEngine):
+ pass
+
+class Time(NullTypeEngine):
+ pass
+
class Binary(NullTypeEngine):
def __init__(self, length=None):
self.length = length
class DECIMAL(Numeric):pass
class INT(Integer):pass
INTEGER = INT
+class SMALLINT(Smallinteger):pass
class TIMESTAMP(DateTime): pass
class DATETIME(DateTime): pass
+class DATE(Date): pass
+class TIME(Time): pass
class CLOB(String): pass
class VARCHAR(String): pass
class CHAR(String):pass
#import sqlalchemy.databases.mysql as mysql
echo = True
+#echo = False
#echo = 'debug'
db = None
db_uri = None
def parse_argv():
# we are using the unittest main runner, so we are just popping out the
# arguments we need instead of using our own getopt type of thing
+ global db, db_uri
+
+ DBTYPE = 'sqlite'
+
if len(sys.argv) >= 3:
- if sys.argv[1] == '--db':
+ if sys.argv[1] == '--dburi':
+ (param, db_uri) = (sys.argv.pop(1), sys.argv.pop(1))
+ elif sys.argv[1] == '--db':
(param, DBTYPE) = (sys.argv.pop(1), sys.argv.pop(1))
- else:
- DBTYPE = 'sqlite'
- global db, db_uri
- if DBTYPE == 'sqlite':
- try:
+ if (None == db_uri):
+ if DBTYPE == 'sqlite':
db_uri = 'sqlite://filename=:memory:'
- db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
- except:
- raise "Could not create sqlite engine. specify --db <sqlite|sqlite_file|postgres|mysql|oracle> to test runner."
- elif DBTYPE == 'sqlite_file':
- db_uri = 'sqlite://filename=querytest.db'
- db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
- elif DBTYPE == 'postgres':
- db_uri = 'postgres://database=test&port=5432&host=127.0.0.1&user=scott&password=tiger'
- db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
- elif DBTYPE == 'mysql':
- db_uri = 'mysql://db=test&host=127.0.0.1&user=scott&passwd=tiger'
- db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
- elif DBTYPE == 'oracle':
- db_uri = 'oracle://user=scott&password=tiger'
+ elif DBTYPE == 'sqlite_file':
+ db_uri = 'sqlite://filename=querytest.db'
+ elif DBTYPE == 'postgres':
+ db_uri = 'postgres://database=test&port=5432&host=127.0.0.1&user=scott&password=tiger'
+ elif DBTYPE == 'mysql':
+ db_uri = 'mysql://db=test&host=127.0.0.1&user=scott&passwd=tiger'
+ elif DBTYPE == 'oracle':
+ db_uri = 'oracle://user=scott&password=tiger'
+ try:
db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
+ except:
+ raise "Could not create engine. specify --db <sqlite|sqlite_file|postgres|mysql|oracle> to test runner."
+
db = EngineAssert(db)
class PersistTest(unittest.TestCase):
db = testbase.db
-
-
class OverrideTest(PersistTest):
def testprocessing(self):
return typeobj()
def adapt_args(self):
return self
-
+
+ global users
users = Table('users', db,
Column('user_id', Integer, primary_key = True),
Column('goofy', MyType, nullable = False)
print repr(l)
self.assert_(l == [(2, u'BIND_INjackBIND_OUT'), (3, u'BIND_INlalaBIND_OUT'), (4, u'BIND_INfredBIND_OUT')])
+ def tearDownAll(self):
+ global users
+ users.drop()
+
class ColumnsTest(AssertMixin):
def testcolumns(self):
expectedResults = { 'int_column': 'int_column INTEGER',
+ 'smallint_column': 'smallint_column SMALLINT',
'varchar_column': 'varchar_column VARCHAR(20)',
'numeric_column': 'numeric_column NUMERIC(12, 3)',
'float_column': 'float_column NUMERIC(25, 2)'
print db.engine.__module__
testTable = Table('testColumns', db,
Column('int_column', Integer),
+ Column('smallint_column', Smallinteger),
Column('varchar_column', String(20)),
Column('numeric_column', Numeric(12,3)),
Column('float_column', Float(25)),
class DateTest(AssertMixin):
def setUpAll(self):
- global users_with_date
- users_with_date = Table('query_users_with_date', db,
- Column('user_id', INT, primary_key = True),
- Column('user_name', VARCHAR(20)),
- Column('user_date', DateTime),
- redefine = True
- )
+ global users_with_date, insert_data
+
+ insert_data = [[7, 'jack', datetime.datetime(2005, 11, 10, 0, 0), datetime.date(2005,11,10), datetime.time(12,20,2)],
+ [8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35), datetime.date(2005,10,10), datetime.time(0,0,0)],
+ [9, 'foo', datetime.datetime(2005, 11, 10, 11, 52, 35, 54839), datetime.date(1970,4,1), datetime.time(23,59,59,999)],
+ [10, 'colber', None, None, None]]
+
+ fnames = ['user_id', 'user_name', 'user_datetime', 'user_date', 'user_time']
+
+ collist = [Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), Column('user_datetime', DateTime),
+ Column('user_date', Date), Column('user_time', Time)]
+
+
+
+ if db.engine.__module__.endswith('mysql'):
+ # strip microseconds -- not supported by this engine (should be an easier way to detect this)
+ for d in insert_data:
+ d[2] = d[2].replace(microsecond=0)
+ d[4] = d[4].replace(microsecond=0)
+
+ try:
+ db.type_descriptor(types.TIME).get_col_spec()
+ print "HI"
+ except:
+ # don't test TIME type -- not supported by this engine
+ insert_data = [d[:-1] for d in insert_data]
+ fnames = fnames[:-1]
+ collist = collist[:-1]
+
+
+ users_with_date = Table('query_users_with_date', db, redefine = True, *collist)
users_with_date.create()
- users_with_date.insert().execute(user_id = 7, user_name = 'jack', user_date=datetime.datetime(2005,11,10))
- users_with_date.insert().execute(user_id = 8, user_name = 'roy', user_date=datetime.datetime(2005,11,10, 11,52,35))
- users_with_date.insert().execute(user_id = 9, user_name = 'foo', user_date=datetime.datetime(2005,11,10, 11,52,35, 54839))
- users_with_date.insert().execute(user_id = 10, user_name = 'colber', user_date=None)
+
+ insert_dicts = [dict(zip(fnames, d)) for d in insert_data]
+ for idict in insert_dicts:
+ users_with_date.insert().execute(**idict) # insert the data
+
def tearDownAll(self):
users_with_date.drop()
def testdate(self):
- l = users_with_date.select().execute().fetchall()
- l = [[c for c in r] for r in l]
- if db.engine.__module__.endswith('mysql'):
- x = [[7, 'jack', datetime.datetime(2005, 11, 10, 0, 0)], [8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35)], [9, 'foo', datetime.datetime(2005, 11, 10, 11, 52, 35)], [10, 'colber', None]]
- else:
- x = [[7, 'jack', datetime.datetime(2005, 11, 10, 0, 0)], [8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35)], [9, 'foo', datetime.datetime(2005, 11, 10, 11, 52, 35, 54839)], [10, 'colber', None]]
- print repr(l)
- print repr(x)
- self.assert_(l == x)
+ global insert_data
+
+ l = map(list, users_with_date.select().execute().fetchall())
+ self.assert_(l == insert_data, 'DateTest mismatch: got:%s expected:%s' % (l, insert_data))
+
def testtextdate(self):
- x = db.text("select user_date from query_users_with_date", typemap={'user_date':DateTime}).execute().fetchall()
+ x = db.text("select user_datetime from query_users_with_date", typemap={'user_datetime':DateTime}).execute().fetchall()
print repr(x)
self.assert_(isinstance(x[0][0], datetime.datetime))
- #x = db.text("select * from query_users_with_date where user_date=:date", bindparams=[bindparam('date', )]).execute(date=datetime.datetime(2005, 11, 10, 11, 52, 35)).fetchall()
+ #x = db.text("select * from query_users_with_date where user_datetime=:date", bindparams=[bindparam('date', )]).execute(date=datetime.datetime(2005, 11, 10, 11, 52, 35)).fetchall()
#print repr(x)
if __name__ == "__main__":