class _NumericType(object):
- "Base for MySQL numeric types."
+ """Base for MySQL numeric types."""
def __init__(self, unsigned=False, zerofill=False, **kw):
self.unsigned = unsigned
class _StringType(object):
- "Base for MySQL string types."
+ """Base for MySQL string types."""
def __init__(self, charset=None, collation=None,
ascii=False, unicode=False, binary=False,
"""MySQL TINYINT type"""
def __init__(self, length=None, **kw):
- """Construct a SMALLINTEGER.
+ """Construct a TINYINT.
+
+ Note: following the usual MySQL conventions, TINYINT(1) columns
+ reflected during Table(..., autoload=True) are treated as
+ Boolean columns.
length
Optional, maximum display width for this number.
return self._extend("SMALLINT")
+class MSBit(sqltypes.TypeEngine):
+ """MySQL BIT type
+
+ This type is for MySQL 5.0.3 or greater for MyISAM, and 5.0.5 or greater for
+ MyISAM, MEMORY, InnoDB and BDB. For older versions, use a MSTinyInteger(1)
+ type.
+ """
+
+ def __init__(self, length=None):
+ self.length = length
+
+ def convert_result_value(self, value, dialect):
+ """Converts MySQL's 64 bit, variable length binary string to a long."""
+
+ if value is not None:
+ v = 0L
+ for i in map(ord, value):
+ v = v << 8 | i
+ value = v
+ return value
+
+ def get_col_spec(self):
+ if self.length is not None:
+ return "BIT(%s)" % self.length
+ else:
+ return "BIT"
+
+
class MSDateTime(sqltypes.DateTime):
"""MySQL DATETIME type"""
"""MySQL TIMESTAMP type
To signal the orm to automatically re-select modified rows to retrieve
- the timestamp, add a PassiveDefault to your column specification:
+ the timestamp, add a PassiveDefault to your column specification::
from sqlalchemy.databases import mysql
- Column('updated', mysql.MSTimeStamp, PassiveDefault(text('CURRENT_TIMESTAMP()')))
+ Column('updated', mysql.MSTimeStamp,
+ PassiveDefault(sql.text('CURRENT_TIMESTAMP')))
+
+ The full range of MySQL 4.1+ TIMESTAMP defaults can be specified in
+ the PassiveDefault::
+
+ PassiveDefault(sql.text('CURRENT TIMESTAMP ON UPDATE CURRENT_TIMESTAMP'))
+
"""
def get_col_spec(self):
return "TIMESTAMP"
-class MSYear(sqltypes.String):
+class MSYear(sqltypes.TypeEngine):
"""MySQL YEAR type, for single byte storage of years 1901-2155"""
def get_col_spec(self):
- if self.length is None:
- return "YEAR"
- else:
- return "YEAR(%d)" % self.length
+ return "YEAR"
class MSText(_StringType, sqltypes.TEXT):
return self._extend("ENUM(%s)" % ",".join(self.__ddl_values))
+class MSSet(MSString):
+ """MySQL SET type."""
+
+ def __init__(self, *values, **kw):
+ """Construct a SET.
+
+ Example::
+
+ Column('myset', MSSet("'foo'", "'bar'", "'baz'"))
+
+ Arguments are:
+
+ values
+ The range of valid values for this SET. Values will be used
+ exactly as they appear when generating schemas.
+
+ charset
+ Optional, a column-level character set for this string
+ value. Takes precendence to 'ascii' or 'unicode' short-hand.
+
+ collation
+ Optional, a column-level collation for this string value.
+ Takes precedence to 'binary' short-hand.
+
+ ascii
+ Defaults to False: short-hand for the ``latin1`` character set,
+ generates ASCII in schema.
+
+ unicode
+ Defaults to False: short-hand for the ``ucs2`` character set,
+ generates UNICODE in schema.
+
+ binary
+ Defaults to False: short-hand, pick the binary collation type
+ that matches the column's character set. Generates BINARY in
+ schema. This does not affect the type of data stored, only the
+ collation of character data.
+ """
+
+ self.__ddl_values = values
+
+ strip_values = []
+ for a in values:
+ if a[0:1] == '"' or a[0:1] == "'":
+ a = a[1:-1]
+ strip_values.append(a)
+
+ self.values = strip_values
+ length = max([len(v) for v in strip_values] + [0])
+ super(MSSet, self).__init__(length, **kw)
+
+ def convert_result_value(self, value, dialect):
+ # The good news:
+ # No ',' quoting issues- commas aren't allowed in SET values
+ # The bad news:
+ # Plenty of driver inconsistencies here.
+ if isinstance(value, util.set_types):
+ # ..some versions convert '' to an empty set
+ if not value:
+ value.add('')
+ # ..some return sets.Set, even for pythons that have __builtin__.set
+ if not isinstance(value, util.Set):
+ value = util.Set(value)
+ return value
+ # ...and some versions return strings
+ if value is not None:
+ return util.Set(value.split(','))
+ else:
+ return value
+
+ def convert_bind_param(self, value, engine):
+ if value is None or isinstance(value, (int, long, basestring)):
+ pass
+ else:
+ if None in value:
+ value = util.Set(value)
+ value.remove(None)
+ value.add('')
+ value = ','.join(value)
+ return super(MSSet, self).convert_bind_param(value, engine)
+
+ def get_col_spec(self):
+ return self._extend("SET(%s)" % ",".join(self.__ddl_values))
+
+
class MSBoolean(sqltypes.Boolean):
+ """MySQL BOOLEAN type."""
+
def get_col_spec(self):
return "BOOL"
else:
return value and True or False
-# TODO: SET, BIT
colspecs = {
sqltypes.Integer: MSInteger,
_BinaryType: _BinaryType,
}
-
ischema_names = {
'bigint': MSBigInteger,
'binary': MSBinary,
- 'blob': MSBlob,
+ 'bit': MSBit,
+ 'blob': MSBlob,
'boolean':MSBoolean,
'char': MSChar,
'date': MSDate,
'nchar': MSNChar,
'nvarchar': MSNVarChar,
'numeric': MSNumeric,
+ 'set': MSSet,
'smallint': MSSmallInteger,
'text': MSText,
'time': MSTime,
'tinytext': MSTinyText,
'varbinary': MSVarBinary,
'varchar': MSString,
+ 'year': MSYear,
}
def descriptor():
if extra_2 is not None:
kw[extra_2] = True
- if args is not None:
- if col_type == 'enum':
+ if args is not None and coltype is not sqltypes.NULLTYPE:
+ if col_type in ('enum', 'set'):
args= args[1:-1]
argslist = args.split(',')
coltype = coltype(*argslist, **kw)
import testbase
+import sets
from sqlalchemy import *
from sqlalchemy.databases import mysql
from testlib import *
for col in numeric_table.c:
index = int(col.name[1:])
- self.assertEquals(gen.get_column_specification(col),
- "%s %s" % (col.name, columns[index][3]))
+ self.assert_eq(gen.get_column_specification(col),
+ "%s %s" % (col.name, columns[index][3]))
try:
numeric_table.create(checkfirst=True)
@testing.supported('mysql')
@testing.exclude('mysql', '<', (4, 1, 1))
def test_charset(self):
- """Exercise CHARACTER SET and COLLATE-related options on string-type
- columns."""
+ """Exercise CHARACTER SET and COLLATE-ish options on string types."""
columns = [
(mysql.MSChar, [1], {},
for col in charset_table.c:
index = int(col.name[1:])
- self.assertEquals(gen.get_column_specification(col),
- "%s %s" % (col.name, columns[index][3]))
+ self.assert_eq(gen.get_column_specification(col),
+ "%s %s" % (col.name, columns[index][3]))
try:
charset_table.create(checkfirst=True)
raise
charset_table.drop()
+ @testing.supported('mysql')
+ @testing.exclude('mysql', '<', (5, 0, 5))
+ def test_bit_50(self):
+ """Exercise BIT types on 5.0+ (not valid for all engine types)"""
+
+ meta = MetaData(testbase.db)
+ bit_table = Table('mysql_bits', meta,
+ Column('b1', mysql.MSBit),
+ Column('b2', mysql.MSBit()),
+ Column('b3', mysql.MSBit(), nullable=False),
+ Column('b4', mysql.MSBit(1)),
+ Column('b5', mysql.MSBit(8)),
+ Column('b6', mysql.MSBit(32)),
+ Column('b7', mysql.MSBit(63)),
+ Column('b8', mysql.MSBit(64)))
+
+ self.assert_eq(colspec(bit_table.c.b1), 'b1 BIT')
+ self.assert_eq(colspec(bit_table.c.b2), 'b2 BIT')
+ self.assert_eq(colspec(bit_table.c.b3), 'b3 BIT NOT NULL')
+ self.assert_eq(colspec(bit_table.c.b4), 'b4 BIT(1)')
+ self.assert_eq(colspec(bit_table.c.b5), 'b5 BIT(8)')
+ self.assert_eq(colspec(bit_table.c.b6), 'b6 BIT(32)')
+ self.assert_eq(colspec(bit_table.c.b7), 'b7 BIT(63)')
+ self.assert_eq(colspec(bit_table.c.b8), 'b8 BIT(64)')
+
+ try:
+ meta.create_all()
+
+ meta2 = MetaData(testbase.db)
+ reflected = Table('mysql_bits', meta2, autoload=True)
+
+ for table in bit_table, reflected:
+
+ def roundtrip(store, expected=None):
+ expected = expected or store
+ table.insert(store).execute()
+ row = list(table.select().execute())[0]
+ try:
+ self.assert_(list(row) == expected)
+ except:
+ print "Storing %s" % store
+ print "Expected %s" % expected
+ print "Found %s" % list(row)
+ raise
+ table.delete().execute()
+
+ roundtrip([0] * 8)
+ roundtrip([None, None, 0, None, None, None, None, None])
+ roundtrip([1] * 8)
+ roundtrip([sql.text("b'1'")] * 8, [1] * 8)
+
+ i = 255
+ roundtrip([0, 0, 0, 0, i, i, i, i])
+ i = 2**32 - 1
+ roundtrip([0, 0, 0, 0, 0, i, i, i])
+ i = 2**63 - 1
+ roundtrip([0, 0, 0, 0, 0, 0, i, i])
+ i = 2**64 - 1
+ roundtrip([0, 0, 0, 0, 0, 0, 0, i])
+ finally:
+ meta.drop_all()
+
+ @testing.supported('mysql')
+ def test_boolean(self):
+ """Test BOOL/TINYINT(1) compatability and reflection."""
+
+ meta = MetaData(testbase.db)
+ bool_table = Table('mysql_bool', meta,
+ Column('b1', BOOLEAN),
+ Column('b2', mysql.MSBoolean),
+ Column('b3', mysql.MSTinyInteger(1)),
+ Column('b4', mysql.MSTinyInteger))
+
+ self.assert_eq(colspec(bool_table.c.b1), 'b1 BOOL')
+ self.assert_eq(colspec(bool_table.c.b2), 'b2 BOOL')
+ self.assert_eq(colspec(bool_table.c.b3), 'b3 TINYINT(1)')
+ self.assert_eq(colspec(bool_table.c.b4), 'b4 TINYINT')
+
+ try:
+ meta.create_all()
+
+ table = bool_table
+ def roundtrip(store, expected=None):
+ expected = expected or store
+ table.insert(store).execute()
+ row = list(table.select().execute())[0]
+ try:
+ self.assert_(list(row) == expected)
+ for i, val in enumerate(expected):
+ if isinstance(val, bool):
+ self.assert_(val is row[i])
+ except:
+ print "Storing %s" % store
+ print "Expected %s" % expected
+ print "Found %s" % list(row)
+ raise
+ table.delete().execute()
+
+
+ roundtrip([None, None, None, None])
+ roundtrip([True, True, 1, 1])
+ roundtrip([False, False, 0, 0])
+ roundtrip([True, True, True, True], [True, True, 1, 1])
+ roundtrip([False, False, 0, 0], [False, False, 0, 0])
+
+ meta2 = MetaData(testbase.db)
+ # replace with reflected
+ table = Table('mysql_bool', meta2, autoload=True)
+ self.assert_eq(colspec(table.c.b3), 'b3 BOOL')
+
+ roundtrip([None, None, None, None])
+ roundtrip([True, True, 1, 1], [True, True, True, 1])
+ roundtrip([False, False, 0, 0], [False, False, False, 0])
+ roundtrip([True, True, True, True], [True, True, True, 1])
+ roundtrip([False, False, 0, 0], [False, False, False, 0])
+ finally:
+ meta.drop_all()
+
+ @testing.supported('mysql')
+ @testing.exclude('mysql', '<', (4, 1, 0))
+ def test_timestamp(self):
+ """Exercise funky TIMESTAMP default syntax."""
+
+ meta = MetaData(testbase.db)
+
+ try:
+ columns = [
+ ([TIMESTAMP],
+ 'TIMESTAMP'),
+ ([mysql.MSTimeStamp],
+ 'TIMESTAMP'),
+ ([mysql.MSTimeStamp,
+ PassiveDefault(sql.text('CURRENT_TIMESTAMP'))],
+ "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"),
+ ([mysql.MSTimeStamp,
+ PassiveDefault(sql.text("'1999-09-09 09:09:09'"))],
+ "TIMESTAMP DEFAULT '1999-09-09 09:09:09'"),
+ ([mysql.MSTimeStamp,
+ PassiveDefault(sql.text("'1999-09-09 09:09:09' "
+ "ON UPDATE CURRENT_TIMESTAMP"))],
+ "TIMESTAMP DEFAULT '1999-09-09 09:09:09' "
+ "ON UPDATE CURRENT_TIMESTAMP"),
+ ([mysql.MSTimeStamp,
+ PassiveDefault(sql.text("CURRENT_TIMESTAMP "
+ "ON UPDATE CURRENT_TIMESTAMP"))],
+ "TIMESTAMP DEFAULT CURRENT_TIMESTAMP "
+ "ON UPDATE CURRENT_TIMESTAMP"),
+ ]
+ for idx, (spec, expected) in enumerate(columns):
+ t = Table('mysql_ts%s' % idx, meta,
+ Column('id', Integer, primary_key=True),
+ Column('t', *spec))
+ self.assert_eq(colspec(t.c.t), "t %s" % expected)
+ t.create()
+ r = Table('mysql_ts%s' % idx, MetaData(testbase.db),
+ autoload=True)
+ if len(spec) > 1:
+ self.assert_(r.c.t is not None)
+ finally:
+ meta.drop_all()
+
+ @testing.supported('mysql')
+ def test_year(self):
+ """Exercise YEAR."""
+
+ meta = MetaData(testbase.db)
+ year_table = Table('mysql_year', meta,
+ Column('y1', mysql.MSYear),
+ Column('y2', mysql.MSYear),
+ Column('y3', mysql.MSYear),
+ Column('y4', mysql.MSYear),
+ Column('y5', mysql.MSYear))
+
+ try:
+ year_table.create()
+ reflected = Table('mysql_year', MetaData(testbase.db),
+ autoload=True)
+
+ for table in year_table, reflected:
+ table.insert(['1950', '50', None, 50, 1950]).execute()
+ row = list(table.select().execute())[0]
+ self.assert_eq(list(row), [1950, 2050, None, 2050, 1950])
+ table.delete().execute()
+ finally:
+ meta.drop_all()
+
+
+ @testing.supported('mysql')
+ def test_set(self):
+ """Exercise the SET type."""
+
+ meta = MetaData(testbase.db)
+ set_table = Table('mysql_set', meta,
+ Column('s1', mysql.MSSet('"dq"', "'sq'")),
+ Column('s2', mysql.MSSet("'a'")),
+ Column('s3', mysql.MSSet("'5'", "'7'", "'9'")))
+
+ self.assert_eq(colspec(set_table.c.s1), """s1 SET("dq",'sq')""")
+ self.assert_eq(colspec(set_table.c.s2), "s2 SET('a')")
+ self.assert_eq(colspec(set_table.c.s3), "s3 SET('5','7','9')")
+
+ try:
+ set_table.create()
+ reflected = Table('mysql_set', MetaData(testbase.db),
+ autoload=True)
+
+ for table in set_table, reflected:
+ def roundtrip(store, expected=None):
+ expected = expected or store
+ table.insert(store).execute()
+ row = list(table.select().execute())[0]
+ try:
+ self.assert_(list(row) == expected)
+ except:
+ print "Storing %s" % store
+ print "Expected %s" % expected
+ print "Found %s" % list(row)
+ raise
+ table.delete().execute()
+
+ roundtrip([None, None, None],[None] * 3)
+ roundtrip(['', '', ''], [set([''])] * 3)
+
+ roundtrip([set(['dq']), set(['a']), set(['5'])])
+ roundtrip(['dq', 'a', '5'],
+ [set(['dq']), set(['a']), set(['5'])])
+ roundtrip([1, 1, 1],
+ [set(['dq']), set(['a']), set(['5'])])
+ roundtrip([set(['dq', 'sq']), None, set(['9', '5', '7'])])
+ finally:
+ meta.drop_all()
+
@testing.supported('mysql')
def test_enum(self):
- "Exercise the ENUM type"
+ """Exercise the ENUM type."""
db = testbase.db
enum_table = Table('mysql_enum', MetaData(testbase.db),
Column('e1', mysql.MSEnum('"a"', "'b'")),
- Column('e2', mysql.MSEnum('"a"', "'b'"), nullable=False),
+ Column('e2', mysql.MSEnum('"a"', "'b'"),
+ nullable=False),
Column('e3', mysql.MSEnum('"a"', "'b'", strict=True)),
- Column('e4', mysql.MSEnum('"a"', "'b'", strict=True), nullable=False))
- spec = lambda c: db.dialect.schemagenerator(db, None, None).get_column_specification(c)
-
- self.assertEqual(spec(enum_table.c.e1), """e1 ENUM("a",'b')""")
- self.assertEqual(spec(enum_table.c.e2), """e2 ENUM("a",'b') NOT NULL""")
- self.assertEqual(spec(enum_table.c.e3), """e3 ENUM("a",'b')""")
- self.assertEqual(spec(enum_table.c.e4), """e4 ENUM("a",'b') NOT NULL""")
+ Column('e4', mysql.MSEnum('"a"', "'b'", strict=True),
+ nullable=False))
+
+ self.assert_eq(colspec(enum_table.c.e1),
+ """e1 ENUM("a",'b')""")
+ self.assert_eq(colspec(enum_table.c.e2),
+ """e2 ENUM("a",'b') NOT NULL""")
+ self.assert_eq(colspec(enum_table.c.e3),
+ """e3 ENUM("a",'b')""")
+ self.assert_eq(colspec(enum_table.c.e4),
+ """e4 ENUM("a",'b') NOT NULL""")
enum_table.drop(checkfirst=True)
enum_table.create()
e.append(tuple([convert(c) for c in row]))
expected = e
- self.assertEqual(res, expected)
+ self.assert_eq(res, expected)
enum_table.drop()
@testing.supported('mysql')
m.drop_all()
+ def assert_eq(self, got, wanted):
+ if got != wanted:
+ print "Expected %s" % wanted
+ print "Found %s" % got
+ self.assertEqual(got, wanted)
+
+
+def colspec(c):
+ return testbase.db.dialect.schemagenerator(
+ testbase.db, None, None).get_column_specification(c)
if __name__ == "__main__":
testbase.main()