import sqlalchemy.util as util
from array import array
+RESERVED_WORDS = util.Set(
+ ['accessible', 'add', 'all', 'alter', 'analyze','and', 'as', 'asc',
+ 'asensitive', 'before', 'between', 'bigint', 'binary', 'blob', 'both',
+ 'by', 'call', 'cascade', 'case', 'change', 'char', 'character', 'check',
+ 'collate', 'column', 'condition', 'constraint', 'continue', 'convert',
+ 'create', 'cross', 'current_date', 'current_time', 'current_timestamp',
+ 'current_user', 'cursor', 'database', 'databases', 'day_hour',
+ 'day_microsecond', 'day_minute', 'day_second', 'dec', 'decimal',
+ 'declare', 'default', 'delayed', 'delete', 'desc', 'describe',
+ 'deterministic', 'distinct', 'distinctrow', 'div', 'double', 'drop',
+ 'dual', 'each', 'else', 'elseif', 'enclosed', 'escaped', 'exists',
+ 'exit', 'explain', 'false', 'fetch', 'float', 'float4', 'float8',
+ 'for', 'force', 'foreign', 'from', 'fulltext', 'grant', 'group', 'having',
+ 'high_priority', 'hour_microsecond', 'hour_minute', 'hour_second', 'if',
+ 'ignore', 'in', 'index', 'infile', 'inner', 'inout', 'insensitive',
+ 'insert', 'int', 'int1', 'int2', 'int3', 'int4', 'int8', 'integer',
+ 'interval', 'into', 'is', 'iterate', 'join', 'key', 'keys', 'kill',
+ 'leading', 'leave', 'left', 'like', 'limit', 'linear', 'lines', 'load',
+ 'localtime', 'localtimestamp', 'lock', 'long', 'longblob', 'longtext',
+ 'loop', 'low_priority', 'master_ssl_verify_server_cert', 'match',
+ 'mediumblob', 'mediumint', 'mediumtext', 'middleint',
+ 'minute_microsecond', 'minute_second', 'mod', 'modifies', 'natural',
+ 'not', 'no_write_to_binlog', 'null', 'numeric', 'on', 'optimize',
+ 'option', 'optionally', 'or', 'order', 'out', 'outer', 'outfile',
+ 'precision', 'primary', 'procedure', 'purge', 'range', 'read', 'reads',
+ 'read_only', 'read_write', 'real', 'references', 'regexp', 'release',
+ 'rename', 'repeat', 'replace', 'require', 'restrict', 'return',
+ 'revoke', 'right', 'rlike', 'schema', 'schemas', 'second_microsecond',
+ 'select', 'sensitive', 'separator', 'set', 'show', 'smallint', 'spatial',
+ 'specific', 'sql', 'sqlexception', 'sqlstate', 'sqlwarning',
+ 'sql_big_result', 'sql_calc_found_rows', 'sql_small_result', 'ssl',
+ 'starting', 'straight_join', 'table', 'terminated', 'then', 'tinyblob',
+ 'tinyint', 'tinytext', 'to', 'trailing', 'trigger', 'true', 'undo',
+ 'union', 'unique', 'unlock', 'unsigned', 'update', 'usage', 'use',
+ 'using', 'utc_date', 'utc_time', 'utc_timestamp', 'values', 'varbinary',
+ 'varchar', 'varcharacter', 'varying', 'when', 'where', 'while', 'with',
+ 'write', 'x509', 'xor', 'year_month', 'zerofill',
+ 'accessible', 'linear', 'master_ssl_verify_server_cert', 'range',
+ 'read_only', 'read_write'])
+
+class _NumericType(object):
+ "Base for MySQL numeric types."
+
+ def __init__(self, unsigned=False, zerofill=False, **kw):
+ self.unsigned = unsigned
+ self.zerofill = zerofill
+
+ def _extend(self, spec):
+ "Extend a numeric-type declaration with MySQL specific extensions."
+
+ if self.unsigned:
+ spec += ' UNSIGNED'
+ if self.zerofill:
+ spec += ' ZEROFILL'
+ return spec
+
+class _StringType(object):
+ "Base for MySQL string types."
+
+ def __init__(self, charset=None, collation=None,
+ ascii=False, unicode=False, binary=False,
+ national=False, **kwargs):
+ self.charset = charset
+ # allow collate= or collation=
+ self.collation = kwargs.get('collate', collation)
+ self.ascii = ascii
+ self.unicode = unicode
+ self.binary = binary
+ self.national = national
+
+ def _extend(self, spec):
+ "Extend a string-type declaration with MySQL specific extensions."
+
+ if self.charset:
+ charset = 'CHARACTER SET %s' % self.charset
+ elif self.ascii:
+ charset = 'ASCII'
+ elif self.unicode:
+ charset = 'UNICODE'
+ else:
+ charset = None
-def kw_colspec(self, spec):
- if self.unsigned:
- spec += ' UNSIGNED'
- if self.zerofill:
- spec += ' ZEROFILL'
- return spec
-
-class MSNumeric(sqltypes.Numeric):
+ if self.collation:
+ collation = 'COLLATE %s' % self.collation
+ elif self.binary:
+ collation = 'BINARY'
+ else:
+ collation = None
+
+ if self.national:
+ # NATIONAL (aka NCHAR/NVARCHAR) trumps charsets.
+ return ' '.join([c for c in ('NATIONAL', spec, collation)
+ if c is not None])
+ return ' '.join([c for c in (spec, charset, collation)
+ if c is not None])
+
+class MSNumeric(sqltypes.Numeric, _NumericType):
def __init__(self, precision = 10, length = 2, **kw):
- self.unsigned = 'unsigned' in kw
- self.zerofill = 'zerofill' in kw
- super(MSNumeric, self).__init__(precision, length)
+ _NumericType.__init__(self, **kw)
+ sqltypes.Numeric.__init__(self, precision, length)
def get_col_spec(self):
if self.precision is None:
- return kw_colspec(self, "NUMERIC")
+ return self._extend("NUMERIC")
else:
- return kw_colspec(self, "NUMERIC(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length})
+ return self._extend("NUMERIC(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length})
class MSDecimal(MSNumeric):
def get_col_spec(self):
- if self.precision is not None and self.length is not None:
- return kw_colspec(self, "DECIMAL(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length})
+ if self.precision is None:
+ return self._extend("DECIMAL")
+ elif self.length is None:
+ return self._extend("DECIMAL(%(precision)s)" % {'precision': self.precision})
+ else:
+ return self._extend("DECIMAL(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length})
class MSDouble(MSNumeric):
def __init__(self, precision=10, length=2, **kw):
if (precision is None and length is not None) or (precision is not None and length is None):
raise exceptions.ArgumentError("You must specify both precision and length or omit both altogether.")
- self.unsigned = 'unsigned' in kw
- self.zerofill = 'zerofill' in kw
- super(MSDouble, self).__init__(precision, length)
+ super(MSDouble, self).__init__(precision, length, **kw)
def get_col_spec(self):
if self.precision is not None and self.length is not None:
- return "DOUBLE(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length}
+ return self._extend("DOUBLE(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length})
else:
- return kw_colspec(self, "DOUBLE")
+ return self._extend('DOUBLE')
-class MSFloat(sqltypes.Float):
+class MSFloat(sqltypes.Float, _NumericType):
def __init__(self, precision=10, length=None, **kw):
if length is not None:
self.length=length
- self.unsigned = 'unsigned' in kw
- self.zerofill = 'zerofill' in kw
- super(MSFloat, self).__init__(precision)
+ _NumericType.__init__(self, **kw)
+ sqltypes.Float.__init__(self, precision)
def get_col_spec(self):
if hasattr(self, 'length') and self.length is not None:
- return kw_colspec(self, "FLOAT(%(precision)s,%(length)s)" % {'precision': self.precision, 'length' : self.length})
+ return self._extend("FLOAT(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length})
elif self.precision is not None:
- return kw_colspec(self, "FLOAT(%(precision)s)" % {'precision': self.precision})
+ return self._extend("FLOAT(%(precision)s)" % {'precision': self.precision})
else:
- return kw_colspec(self, "FLOAT")
+ return self._extend("FLOAT")
-class MSInteger(sqltypes.Integer):
+class MSInteger(sqltypes.Integer, _NumericType):
def __init__(self, length=None, **kw):
self.length = length
- self.unsigned = 'unsigned' in kw
- self.zerofill = 'zerofill' in kw
- super(MSInteger, self).__init__()
+ _NumericType.__init__(self, **kw)
+ sqltypes.Integer.__init__(self)
def get_col_spec(self):
if self.length is not None:
- return kw_colspec(self, "INTEGER(%(length)s)" % {'length': self.length})
+ return self._extend("INTEGER(%(length)s)" % {'length': self.length})
else:
- return kw_colspec(self, "INTEGER")
+ return self._extend("INTEGER")
class MSBigInteger(MSInteger):
def get_col_spec(self):
if self.length is not None:
- return kw_colspec(self, "BIGINT(%(length)s)" % {'length': self.length})
+ return self._extend("BIGINT(%(length)s)" % {'length': self.length})
else:
- return kw_colspec(self, "BIGINT")
+ return self._extend("BIGINT")
-class MSSmallInteger(sqltypes.Smallinteger):
+class MSSmallInteger(sqltypes.Smallinteger, _NumericType):
def __init__(self, length=None, **kw):
self.length = length
- self.unsigned = 'unsigned' in kw
- self.zerofill = 'zerofill' in kw
- super(MSSmallInteger, self).__init__()
+ _NumericType.__init__(self, **kw)
+ sqltypes.Smallinteger.__init__(self)
+
def get_col_spec(self):
if self.length is not None:
- return kw_colspec(self, "SMALLINT(%(length)s)" % {'length': self.length})
+ return self._extend("SMALLINT(%(length)s)" % {'length': self.length})
else:
- return kw_colspec(self, "SMALLINT")
+ return self._extend("SMALLINT")
class MSDateTime(sqltypes.DateTime):
def get_col_spec(self):
def get_col_spec(self):
return "TIMESTAMP"
-class MSText(sqltypes.TEXT):
- def __init__(self, **kw):
- self.binary = 'binary' in kw
- super(MSText, self).__init__()
+class MSText(sqltypes.TEXT, _StringType):
+ def __init__(self, **kwargs):
+ _StringType.__init__(self, **kwargs)
+ sqltypes.TEXT.__init__(self)
def get_col_spec(self):
- return "TEXT"
+ return self._extend("TEXT")
class MSTinyText(MSText):
def get_col_spec(self):
- if self.binary:
- return "TEXT BINARY"
- else:
- return "TEXT"
+ return self._extend("TINYTEXT")
class MSMediumText(MSText):
def get_col_spec(self):
- if self.binary:
- return "MEDIUMTEXT BINARY"
- else:
- return "MEDIUMTEXT"
+ return self._extend("MEDIUMTEXT")
class MSLongText(MSText):
def get_col_spec(self):
- if self.binary:
- return "LONGTEXT BINARY"
- else:
- return "LONGTEXT"
+ return self._extend("LONGTEXT")
+
+class MSString(sqltypes.String, _StringType):
+ def __init__(self, length, national=False, **kwargs):
+ _StringType.__init__(self, national=national, **kwargs)
+ sqltypes.String.__init__(self, length, kwargs.get('convert_unicode', False))
-class MSString(sqltypes.String):
def get_col_spec(self):
- return "VARCHAR(%(length)s)" % {'length' : self.length}
+ return self._extend("VARCHAR(%(length)s)" % {'length' : self.length})
+
+class MSChar(sqltypes.CHAR, _StringType):
+ def __init__(self, length, national=False, **kwargs):
+ _StringType.__init__(self, national=national, **kwargs)
+ sqltypes.CHAR.__init__(self, length, kwargs.get('convert_unicode', False))
-class MSChar(sqltypes.CHAR):
def get_col_spec(self):
- return "CHAR(%(length)s)" % {'length' : self.length}
+ return self._extend("CHAR(%(length)s)" % {'length' : self.length})
class MSBinary(sqltypes.Binary):
def get_col_spec(self):
return "MEDIUMBLOB"
class MSEnum(MSString):
- def __init__(self, *enums):
+ def __init__(self, *enums, **kw):
self.__enums_hidden = enums
length = 0
strip_enums = []
length=len(a)
strip_enums.append(a)
self.enums = strip_enums
- super(MSEnum, self).__init__(length)
+ super(MSEnum, self).__init__(length, **kw)
def get_col_spec(self):
- return "ENUM(%s)" % ",".join(self.__enums_hidden)
-
+ return self._extend("ENUM(%s)" % ",".join(self.__enums_hidden))
class MSBoolean(sqltypes.Boolean):
def get_col_spec(self):
else:
return value and True or False
+# TODO: NCHAR, NVARCHAR, SET
+
colspecs = {
-# sqltypes.BIGinteger : MSInteger,
sqltypes.Integer : MSInteger,
sqltypes.Smallinteger : MSSmallInteger,
sqltypes.Numeric : MSNumeric,
if match:
tabletype = match.group('ttype')
- fkpat = r'''CONSTRAINT [`"'](?P<name>.+?)[`"'] FOREIGN KEY \((?P<columns>.+?)\) REFERENCES [`"'](?P<reftable>.+?)[`"'] \((?P<refcols>.+?)\)'''
+ # \x27 == ' (single quote) (avoid xemacs syntax highlighting issue)
+ fkpat = r'''CONSTRAINT [`"\x27](?P<name>.+?)[`"\x27] FOREIGN KEY \((?P<columns>.+?)\) REFERENCES [`"\x27](?P<reftable>.+?)[`"\x27] \((?P<refcols>.+?)\)'''
for match in re.finditer(fkpat, desc):
- columns = re.findall(r'''[`"'](.+?)[`"']''', match.group('columns'))
- refcols = [match.group('reftable') + "." + x for x in re.findall(r'''[`"'](.+?)[`"']''', match.group('refcols'))]
+ columns = re.findall(r'''[`"\x27](.+?)[`"\x27]''', match.group('columns'))
+ refcols = [match.group('reftable') + "." + x for x in re.findall(r'''[`"\x27](.+?)[`"\x27]''', match.group('refcols'))]
schema.Table(match.group('reftable'), table.metadata, autoload=True, autoload_with=connection)
constraint = schema.ForeignKeyConstraint(columns, refcols, name=match.group('name'))
table.append_constraint(constraint)
def __init__(self, dialect):
super(MySQLIdentifierPreparer, self).__init__(dialect, initial_quote='`')
+ def _reserved_words(self):
+ return RESERVED_WORDS
+
def _escape_identifier(self, value):
- #TODO: determin MySQL's escaping rules
+ #TODO: determine MySQL's escaping rules
return value
def _fold_identifier_case(self, value):
- #TODO: determin MySQL's case folding rules
+ #TODO: determine MySQL's case folding rules
return value
dialect = MySQLDialect
def testcolumns(self):
expectedResults = { 'int_column': 'int_column INTEGER',
'smallint_column': 'smallint_column SMALLINT',
- 'varchar_column': 'varchar_column VARCHAR(20)',
- 'numeric_column': 'numeric_column NUMERIC(12, 3)',
- 'float_column': 'float_column NUMERIC(25, 2)'
- }
+ 'varchar_column': 'varchar_column VARCHAR(20)',
+ 'numeric_column': 'numeric_column NUMERIC(12, 3)',
+ 'float_column': 'float_column NUMERIC(25, 2)'
+ }
if not db.name=='sqlite' and not db.name=='oracle':
expectedResults['float_column'] = 'float_column FLOAT(25)'
for aCol in testTable.c:
self.assertEquals(expectedResults[aCol.name], db.dialect.schemagenerator(db, None, None).get_column_specification(aCol))
+
+ @testbase.supported('mysql')
+ def test_mysql_numeric(self):
+ import sqlalchemy.databases.mysql as my
+
+ columns = [
+ # column type, args, kwargs, expected ddl
+ # e.g. Column(Integer(10, unsigned=True)) == 'INTEGER(10) UNSIGNED'
+ (my.MSNumeric, [], {},
+ 'NUMERIC(10, 2)'),
+ (my.MSNumeric, [None], {},
+ 'NUMERIC'),
+ (my.MSNumeric, [12], {},
+ 'NUMERIC(12, 2)'),
+ (my.MSNumeric, [12, 4], {'unsigned':True},
+ 'NUMERIC(12, 4) UNSIGNED'),
+ (my.MSNumeric, [12, 4], {'zerofill':True},
+ 'NUMERIC(12, 4) ZEROFILL'),
+ (my.MSNumeric, [12, 4], {'zerofill':True, 'unsigned':True},
+ 'NUMERIC(12, 4) UNSIGNED ZEROFILL'),
+
+ (my.MSDecimal, [], {},
+ 'DECIMAL(10, 2)'),
+ (my.MSDecimal, [None], {},
+ 'DECIMAL'),
+ (my.MSDecimal, [12], {},
+ 'DECIMAL(12, 2)'),
+ (my.MSDecimal, [12, None], {},
+ 'DECIMAL(12)'),
+ (my.MSDecimal, [12, 4], {'unsigned':True},
+ 'DECIMAL(12, 4) UNSIGNED'),
+ (my.MSDecimal, [12, 4], {'zerofill':True},
+ 'DECIMAL(12, 4) ZEROFILL'),
+ (my.MSDecimal, [12, 4], {'zerofill':True, 'unsigned':True},
+ 'DECIMAL(12, 4) UNSIGNED ZEROFILL'),
+
+ (my.MSDouble, [None, None], {},
+ 'DOUBLE'),
+ (my.MSDouble, [12], {},
+ 'DOUBLE(12, 2)'),
+ (my.MSDouble, [12, 4], {'unsigned':True},
+ 'DOUBLE(12, 4) UNSIGNED'),
+ (my.MSDouble, [12, 4], {'zerofill':True},
+ 'DOUBLE(12, 4) ZEROFILL'),
+ (my.MSDouble, [12, 4], {'zerofill':True, 'unsigned':True},
+ 'DOUBLE(12, 4) UNSIGNED ZEROFILL'),
+
+ (my.MSFloat, [], {},
+ 'FLOAT(10)'),
+ (my.MSFloat, [None], {},
+ 'FLOAT'),
+ (my.MSFloat, [12], {},
+ 'FLOAT(12)'),
+ (my.MSFloat, [12, 4], {},
+ 'FLOAT(12, 4)'),
+ (my.MSFloat, [12, 4], {'unsigned':True},
+ 'FLOAT(12, 4) UNSIGNED'),
+ (my.MSFloat, [12, 4], {'zerofill':True},
+ 'FLOAT(12, 4) ZEROFILL'),
+ (my.MSFloat, [12, 4], {'zerofill':True, 'unsigned':True},
+ 'FLOAT(12, 4) UNSIGNED ZEROFILL'),
+
+ (my.MSInteger, [], {},
+ 'INTEGER'),
+ (my.MSInteger, [4], {},
+ 'INTEGER(4)'),
+ (my.MSInteger, [4], {'unsigned':True},
+ 'INTEGER(4) UNSIGNED'),
+ (my.MSInteger, [4], {'zerofill':True},
+ 'INTEGER(4) ZEROFILL'),
+ (my.MSInteger, [4], {'zerofill':True, 'unsigned':True},
+ 'INTEGER(4) UNSIGNED ZEROFILL'),
+
+ (my.MSBigInteger, [], {},
+ 'BIGINT'),
+ (my.MSBigInteger, [4], {},
+ 'BIGINT(4)'),
+ (my.MSBigInteger, [4], {'unsigned':True},
+ 'BIGINT(4) UNSIGNED'),
+ (my.MSBigInteger, [4], {'zerofill':True},
+ 'BIGINT(4) ZEROFILL'),
+ (my.MSBigInteger, [4], {'zerofill':True, 'unsigned':True},
+ 'BIGINT(4) UNSIGNED ZEROFILL'),
+
+ (my.MSSmallInteger, [], {},
+ 'SMALLINT'),
+ (my.MSSmallInteger, [4], {},
+ 'SMALLINT(4)'),
+ (my.MSSmallInteger, [4], {'unsigned':True},
+ 'SMALLINT(4) UNSIGNED'),
+ (my.MSSmallInteger, [4], {'zerofill':True},
+ 'SMALLINT(4) ZEROFILL'),
+ (my.MSSmallInteger, [4], {'zerofill':True, 'unsigned':True},
+ 'SMALLINT(4) UNSIGNED ZEROFILL'),
+ ]
+
+ table_args = ['testMyNumeric', db]
+ for index, spec in enumerate(columns):
+ type_, args, kw, res = spec
+ table_args.append(Column('c%s' % index, type_(*args, **kw)))
+
+ numeric_table = Table(*table_args)
+ gen = db.dialect.schemagenerator(db, None, None)
+
+ for col in numeric_table.c:
+ index = int(col.name[1:])
+ self.assertEquals(gen.get_column_specification(col),
+ "%s %s" % (col.name, columns[index][3]))
+
+ try:
+ numeric_table.create(checkfirst=True)
+ assert True
+ except:
+ raise
+ numeric_table.drop()
+
+
+class CharsetTest(AssertMixin):
+
+ @testbase.supported('mysql')
+ def testcharset(self):
+ import sqlalchemy.databases.mysql as my
+
+ columns = [
+ (my.MSChar, [1], {},
+ 'CHAR(1)'),
+ (my.MSChar, [1], {'binary':True},
+ 'CHAR(1) BINARY'),
+ (my.MSChar, [1], {'ascii':True},
+ 'CHAR(1) ASCII'),
+ (my.MSChar, [1], {'unicode':True},
+ 'CHAR(1) UNICODE'),
+ (my.MSChar, [1], {'ascii':True, 'binary':True},
+ 'CHAR(1) ASCII BINARY'),
+ (my.MSChar, [1], {'unicode':True, 'binary':True},
+ 'CHAR(1) UNICODE BINARY'),
+ (my.MSChar, [1], {'charset':'utf8'},
+ 'CHAR(1) CHARACTER SET utf8'),
+ (my.MSChar, [1], {'charset':'utf8', 'binary':True},
+ 'CHAR(1) CHARACTER SET utf8 BINARY'),
+ (my.MSChar, [1], {'charset':'utf8', 'unicode':True},
+ 'CHAR(1) CHARACTER SET utf8'),
+ (my.MSChar, [1], {'charset':'utf8', 'ascii':True},
+ 'CHAR(1) CHARACTER SET utf8'),
+ (my.MSChar, [1], {'collation': 'utf8_bin'},
+ 'CHAR(1) COLLATE utf8_bin'),
+ (my.MSChar, [1], {'charset': 'utf8', 'collation': 'utf8_bin'},
+ 'CHAR(1) CHARACTER SET utf8 COLLATE utf8_bin'),
+ (my.MSChar, [1], {'charset': 'utf8', 'binary': True},
+ 'CHAR(1) CHARACTER SET utf8 BINARY'),
+ (my.MSChar, [1], {'charset': 'utf8', 'collation': 'utf8_bin',
+ 'binary': True},
+ 'CHAR(1) CHARACTER SET utf8 COLLATE utf8_bin'),
+ (my.MSChar, [1], {'national':True},
+ 'NATIONAL CHAR(1)'),
+ (my.MSChar, [1], {'national':True, 'charset':'utf8'},
+ 'NATIONAL CHAR(1)'),
+ (my.MSChar, [1], {'national':True, 'charset':'utf8', 'binary':True},
+ 'NATIONAL CHAR(1) BINARY'),
+ (my.MSChar, [1], {'national':True, 'binary':True, 'unicode':True},
+ 'NATIONAL CHAR(1) BINARY'),
+ (my.MSChar, [1], {'national':True, 'collation':'utf8_bin'},
+ 'NATIONAL CHAR(1) COLLATE utf8_bin'),
+ (my.MSString, [1], {'charset':'utf8', 'collation':'utf8_bin'},
+ 'VARCHAR(1) CHARACTER SET utf8 COLLATE utf8_bin'),
+ (my.MSString, [1], {'national':True, 'collation':'utf8_bin'},
+ 'NATIONAL VARCHAR(1) COLLATE utf8_bin'),
+ (my.MSTinyText, [], {'charset':'utf8', 'collation':'utf8_bin'},
+ 'TINYTEXT CHARACTER SET utf8 COLLATE utf8_bin'),
+ (my.MSMediumText, [], {'charset':'utf8', 'binary':True},
+ 'MEDIUMTEXT CHARACTER SET utf8 BINARY'),
+ (my.MSLongText, [], {'ascii':True},
+ 'LONGTEXT ASCII'),
+ (my.MSEnum, ["'foo'", "'bar'"], {'unicode':True},
+ '''ENUM('foo','bar') UNICODE''')
+ ]
+
+ table_args = ['testCharset', db]
+ for index, spec in enumerate(columns):
+ type_, args, kw, res = spec
+ table_args.append(Column('c%s' % index, type_(*args, **kw)))
+
+ charset_table = Table(*table_args)
+ gen = db.dialect.schemagenerator(db, None, None)
+
+ for col in charset_table.c:
+ index = int(col.name[1:])
+ self.assertEquals(gen.get_column_specification(col),
+ "%s %s" % (col.name, columns[index][3]))
+
+ try:
+ charset_table.create(checkfirst=True)
+ assert True
+ except:
+ raise
+ charset_table.drop()
class UnicodeTest(AssertMixin):
"""tests the Unicode type. also tests the TypeDecorator with instances in the types package."""
res2 = bool_table.select(bool_table.c.value==False).execute().fetchall()
print res2
assert(res2==[(2, False)])
-
-
+
if __name__ == "__main__":
testbase.main()