class MSNumeric(sqltypes.Numeric):
def get_col_spec(self):
return "NUMERIC(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length}
+class MSFloat(sqltypes.Float):
+ def get_col_spec(self):
+ return "FLOAT(%(precision)s)" % {'precision': self.precision}
class MSInteger(sqltypes.Integer):
def get_col_spec(self):
return "INTEGER"
colspecs = {
sqltypes.Integer : MSInteger,
sqltypes.Numeric : MSNumeric,
+ sqltypes.Float : MSFloat,
sqltypes.DateTime : MSDateTime,
sqltypes.String : MSString,
sqltypes.Binary : MSBinary,
'char' : MSChar,
'text' : MSText,
'decimal' : MSNumeric,
+ 'float' : MSFloat,
'timestamp' : MSDateTime,
'binary' : MSBinary,
}
class PGNumeric(sqltypes.Numeric):
def get_col_spec(self):
return "NUMERIC(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length}
+class PGFloat(sqltypes.Float):
+ def get_col_spec(self):
+ return "FLOAT(%(precision)s)" % {'precision': self.precision}
class PGInteger(sqltypes.Integer):
def get_col_spec(self):
return "INTEGER"
pg2_colspecs = {
sqltypes.Integer : PGInteger,
sqltypes.Numeric : PGNumeric,
+ sqltypes.Float : PGFloat,
sqltypes.DateTime : PG2DateTime,
sqltypes.String : PGString,
sqltypes.Binary : PGBinary,
pg2_ischema_names = {
'integer' : PGInteger,
+ 'bigint' : PGInteger,
'character varying' : PGString,
'character' : PGChar,
'text' : PGText,
'numeric' : PGNumeric,
+ 'float' : PGFloat,
+ 'real' : PGFloat,
+ 'double precision' : PGFloat,
'timestamp without time zone' : PG2DateTime,
'bytea' : PGBinary,
+ 'boolean' : PGBoolean,
}
pg1_ischema_names = pg2_ischema_names.copy()
pg1_ischema_names['timestamp without time zone'] = PG1DateTime
colspecs = {
sqltypes.Integer : SLInteger,
sqltypes.Numeric : SLNumeric,
+ sqltypes.Float : SLNumeric,
sqltypes.DateTime : SLDateTime,
sqltypes.String : SLString,
sqltypes.Binary : SLBinary,
'CHAR' : SLChar,
'TEXT' : SLText,
'NUMERIC' : SLNumeric,
+ 'FLOAT' : SLNumeric,
'TIMESTAMP' : SLDateTime,
'BLOB' : SLBinary,
}
__all__ = [ 'TypeEngine', 'TypeDecorator', 'NullTypeEngine',
'INT', 'CHAR', 'VARCHAR', 'TEXT', 'FLOAT', 'DECIMAL',
- 'TIMESTAMP', 'DATETIME', 'CLOB', 'BLOB', 'BOOLEAN', 'String', 'Integer', 'Numeric', 'DateTime', 'Binary', 'Boolean', 'NULLTYPE'
+ 'TIMESTAMP', 'DATETIME', 'CLOB', 'BLOB', 'BOOLEAN', 'String', 'Integer', 'Numeric', 'Float', 'DateTime', 'Binary', 'Boolean', 'NULLTYPE'
]
def adapt(self, typeobj):
return typeobj(self.precision, self.length)
+class Float(NullTypeEngine):
+ def __init__(self, precision = 10):
+ self.precision = precision
+ def adapt(self, typeobj):
+ return typeobj(self.precision)
+
class DateTime(NullTypeEngine):
pass
class Boolean(NullTypeEngine):
pass
-class FLOAT(Numeric):pass
+class FLOAT(Float):pass
class TEXT(String):pass
class DECIMAL(Numeric):pass
class INT(Integer):pass
class BLOB(Binary): pass
class BOOLEAN(Boolean): pass
-NULLTYPE = NullTypeEngine()
\ No newline at end of file
+NULLTYPE = NullTypeEngine()
testbase.echo = False
def suite():
- modules_to_test = ('attributes', 'historyarray', 'pool', 'engines', 'query', 'types', 'mapper', 'objectstore', 'dependency', 'sequence', 'select')
+ modules_to_test = ('attributes', 'historyarray', 'pool', 'engines', 'query', 'types', 'mapper', 'objectstore', 'dependency', 'sequence', 'select', 'columns')
# modules_to_test = ('engines', 'mapper')
alltests = unittest.TestSuite()
for module in map(__import__, modules_to_test):
--- /dev/null
+from testbase import PersistTest
+import testbase
+import unittest, sys, datetime
+
+import sqlalchemy.databases.sqlite as sqllite
+
+from sqlalchemy import *
+
+class ColumnsTest(PersistTest):
+
+ defaultExpectedResults = { 'int_column': 'int_column INTEGER',
+ 'varchar_column': 'varchar_column VARCHAR(20)',
+ 'numeric_column': 'numeric_column NUMERIC(12, 3)',
+ 'float_column': 'float_column NUMERIC(25, 2)'
+ }
+
+ def setUp(self):
+ pass
+
+ def _buildTestTable(self):
+ testTable = Table('testColumns', self.db,
+ Column('int_column', INT),
+ Column('varchar_column', VARCHAR(20)),
+ Column('numeric_column', Numeric(12,3)),
+ Column('float_column', FLOAT(25)),
+ )
+ return testTable
+
+ def _doTest(self, expectedResults):
+ testTable = self._buildTestTable()
+ for aCol in testTable.c:
+ self.assertEquals(expectedResults[aCol.name], self.db.schemagenerator(None).get_column_specification(aCol))
+
+ def testSqliteColumns(self):
+ self.db = create_engine('sqlite', {'filename':':memory:'})
+ self._doTest(self.defaultExpectedResults)
+
+ def testPostgresColumns(self):
+ self.db = engine.create_engine('postgres', {'database':'test', 'host':'127.0.0.1', 'user':'scott', 'password':'tiger'}, echo=False)
+ expectedResults = self.defaultExpectedResults.copy()
+ expectedResults['float_column'] = 'float_column FLOAT(25)'
+ self._doTest(expectedResults)
+
+ def testMySqlColumns(self):
+ self.db = engine.create_engine('mysql', {'db':'test', 'host':'127.0.0.1', 'user':'scott', 'passwd':'tiger'}, echo=False)
+ expectedResults = self.defaultExpectedResults.copy()
+ expectedResults['float_column'] = 'float_column FLOAT(25)'
+ self._doTest(expectedResults)
+
+if __name__ == "__main__":
+ unittest.main()
Column('user_id', INT, primary_key = True),
Column('user_name', VARCHAR(20), nullable = False),
Column('test1', CHAR(5), nullable = False),
- Column('test2', FLOAT(5,5), nullable = False),
+ Column('test2', FLOAT(5), nullable = False),
Column('test3', TEXT),
Column('test4', DECIMAL, nullable = False),
Column('test5', TIMESTAMP),