- Mark everything in a test suite as failed when setUpAll fails.
- Added test coverage for Unicode table names in metadata.reflect()
- @testing.exclude() filters out tests by server version
- Applied exclude to the test suite, MySQL 4.1 passes again (no XA or SAVEPOINT)
- Removed MySQL charset-setting pool hook- charset=utf8&use_unicode=0 works just as well. (Am I nuts? I'd swear this didn't work before.)
- Finally migrated some old MySQL-tests into the dialect test module
- Corrected 'commit' and 'rollback' logic (and comment) for ancient MySQL versions lacking transactions entirely
- Deprecated the MySQL get_version_info in favor of server_version_info
- Added a big hunk-o-doc for MySQL.
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-import re, datetime, inspect, warnings, weakref, operator
+"""Support for the MySQL database.
+
+SQLAlchemy supports 6 major MySQL versions: 3.23, 4.0, 4.1, 5.0, 5.1 and 6.0,
+with capablities increasing with more modern servers.
+
+Versions 4.1 and higher support the basic SQL functionality that SQLAlchemy
+uses in the ORM and SQL expressions. These versions pass the applicable
+tests in the suite 100%. No heroic measures are taken to work around major
+missing SQL features- if your server version does not support sub-selects, for
+example, they won't work in SQLAlchemy either.
+
+Currently, the only DB-API driver supported is `MySQL-Python` (also referred to
+as `MySQLdb`). Either 1.2.1 or 1.2.2 are recommended. The alpha, beta and
+gamma releases of 1.2.1 and 1.2.2 should be avoided. Support for Jython and
+IronPython is planned.
+
+===================================== ===============
+Feature Minimum Version
+===================================== ===============
+sqlalchemy.orm 4.1.1
+Table Reflection 3.23.x
+DDL Generation 4.1.1
+utf8/Full Unicode Connections 4.1.1
+Transactions 3.23.15
+Two-Phase Transactions 5.0.3
+Nested Transactions 5.0.3
+===================================== ===============
+
+See the official MySQL documentation for detailed information about features
+supported in any given server release.
+
+Many MySQL server installations default to a ``latin1`` encoding for client
+connections. All data sent through the connection will be converted
+into ``latin1``, even if you have ``utf8`` or another character set on your
+tables and columns. With versions 4.1 and higher, you can change the
+connection character set either through server configuration or by passing
+the ``charset`` parameter to ``create_engine``. The ``charset`` option is
+passed through to MySQL-Python and has the side-effect of also enabling
+``use_unicode`` in the driver by default. For regular encoded strings, also
+pass ``use_unicode=0`` in the connection arguments.
+
+Most MySQL server installations have a default table type of `MyISAM`, a
+non-transactional table type. During a transaction, non-transactional
+storage engines do not participate and continue to store table changes in
+autocommit mode. For fully atomic transactions, all participating tables
+must use a transactional engine such as `InnoDB`, `Falcon`, `SolidDB`,
+`PBXT`, etc. Storage engines can be elected when creating tables in
+SQLAlchemy by supplying a ``mysql_engine='whatever'`` to the ``Table``
+constructor. Any MySQL table creation option can be specified in this syntax.
+
+Not all MySQL storage engines support foreign keys. For `MyISAM` and similar
+engines, the information loaded by table reflection will not include foreign
+keys. For these tables, you may supply ``ForeignKeyConstraints`` at reflection
+time::
+
+ Table('mytable', metadata, autoload=True,
+ ForeignKeyConstraint(['other_id'], ['othertable.other_id']))
+
+For normal SQLAlchemy usage, loading this module is unnescesary. It will be
+loaded on-demand when a MySQL connection is needed. If you would like to use
+one of the MySQL-specific or enhanced column types when creating tables with
+your ``Table`` definitions, then you will need to import them from this module::
+
+ from sqlalchemy.databases import mysql
+
+ Table('mytable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('ittybittyblob', mysql.MSTinyBlob),
+ Column('biggy', mysql.MSBigInteger(unsigned=True)))
+
+If you have problems that seem server related, first check that you are
+using the most recent stable MySQL-Python package available. The Database
+Notes page on the wiki at http://sqlalchemy.org is a good resource for timely
+information affecting MySQL in SQLAlchemy.
+"""
+
+import re, datetime, inspect, warnings, weakref, operator, sys
from array import array as _array
from sqlalchemy import sql, schema, ansisql
-from sqlalchemy.engine import default
+from sqlalchemy.engine import base as engine_base, default
import sqlalchemy.types as sqltypes
import sqlalchemy.exceptions as exceptions
import sqlalchemy.util as util
pad value.
length
- Maximum data length, in bytes. If not length is specified, this
+ Maximum data length, in bytes. If length is not specified, this
will generate a BLOB. This usage is deprecated.
"""
def do_execute(self, cursor, statement, parameters, **kwargs):
cursor.execute(statement, parameters)
+ def do_commit(self, connection):
+ """Execute a COMMIT."""
+
+ # COMMIT/ROLLBACK were introduced in 3.23.15.
+ # Yes, we have at least one user who has to talk to these old versions!
+ #
+ # Ignore commit/rollback if support isn't present, otherwise even basic
+ # operations via autocommit fail.
+ try:
+ connection.commit()
+ except:
+ if self._server_version_info(connection) < (3, 23, 15):
+ args = sys.exc_info()[1].args
+ if args and args[0] == 1064:
+ return
+ raise
+
def do_rollback(self, connection):
- # MySQL without InnoDB doesnt support rollback()
+ """Execute a ROLLBACK."""
+
try:
connection.rollback()
except:
- pass
+ if self._server_version_info(connection) < (3, 23, 15):
+ args = sys.exc_info()[1].args
+ if args and args[0] == 1064:
+ return
+ raise
def do_begin_twophase(self, connection, xid):
connection.execute("XA BEGIN %s", xid)
resultset = connection.execute("XA RECOVER")
return [row['data'][0:row['gtrid_length']] for row in resultset]
+ def do_ping(self, connection):
+ connection.ping()
+
def is_disconnect(self, e):
return isinstance(e, self.dbapi.OperationalError) and \
e.args[0] in (2006, 2013, 2014, 2045, 2055)
return False
raise
- def get_version_info(self, connectable):
- """A tuple of the database server version."""
-
- if hasattr(connectable, 'connect'):
- dbapi_con = connectable.connect().connection
- else:
- dbapi_con = connectable
+ def server_version_info(self, connection):
+ """A tuple of the database server version.
+
+ Formats the remote server version as a tuple of version values,
+ e.g. ``(5, 0, 44)``. If there are strings in the version number
+ they will be in the tuple too, so don't count on these all being
+ ``int`` values.
+
+ This is a fast check that does not require a round trip. It is also
+ cached per-Connection.
+ """
+
+ try:
+ return connection.properties['_mysql_server_version_info']
+ except KeyError:
+ version = connection.properties['_mysql_server_version_info'] = \
+ self._server_version_info(connection.connection.connection)
+ return version
+
+ def _server_version_info(self, dbapi_con):
+ """Convert a MySQL-python server_info string into a tuple."""
+
version = []
for n in dbapi_con.get_server_info().split('.'):
try:
version.append(n)
return tuple(version)
+ # @deprecated
+ def get_version_info(self, connectable):
+ """A tuple of the database server version.
+
+ Deprecated, use ``server_version_info()``.
+ """
+
+ if isinstance(connectable, engine_base.Engine):
+ connectable = connectable.contextual_connect()
+
+ return self.server_version_info(connectable)
+ get_version_info = util.deprecated(get_version_info)
+
def reflecttable(self, connection, table, include_columns):
"""Load column definitions from the server."""
# Note: MySQL-python 1.2.1c7 seems to ignore changes made
# on a connection via set_character_set()
- if self.get_version_info(connection) < (4, 1, 0):
+ if self.server_version_info(connection) < (4, 1, 0):
try:
return connection.connection.character_set_name()
except AttributeError:
return connection.properties['collations']
except KeyError:
collations = {}
- if self.get_version_info(connection) < (4, 1, 0):
+ if self.server_version_info(connection) < (4, 1, 0):
pass
else:
rs = connection.execute('SHOW COLLATION')
return text
+# ug. "InnoDB needs indexes on foreign keys and referenced keys [...].
+# Starting with MySQL 4.1.2, these indexes are created automatically.
+# In older versions, the indexes must be created explicitly or the
+# creation of foreign key constraints fails."
+
class MySQLSchemaGenerator(ansisql.ANSISchemaGenerator):
def get_column_specification(self, column, override_pk=False,
first_pk=False):
class MySQLIdentifierPreparer(ansisql.ANSIIdentifierPreparer):
+ """MySQL-specific schema identifier configuration."""
+
def __init__(self, dialect):
super(MySQLIdentifierPreparer, self).__init__(dialect,
initial_quote='`')
# just leave everything as-is.
return value
-class MySQLCharsetOnConnect(object):
- """Use an alternate connection character set automatically."""
-
- def __init__(self, charset, collation=None):
- """Creates a pool listener that decorates new database connections.
-
- Sets the connection character set on MySQL connections. Strings
- sent to and from the server will use this encoding, and if a collation
- is provided it will be used as the default.
-
- There is also a MySQL-python 'charset' keyword for connections,
- however that keyword has the side-effect of turning all strings into
- Unicode.
-
- This class is a ``Pool`` listener. To use, pass an insstance to the
- ``listeners`` argument to create_engine or Pool constructor, or
- manually add it to a pool with ``add_listener()``.
- charset:
- The character set to use
-
- collation:
- Optional, use a non-default collation for the given charset
- """
-
- self.charset = charset
- self.collation = collation
-
- def connect(self, dbapi_con, con_record):
- cr = dbapi_con.cursor()
- try:
- if self.collation is None:
- if hasattr(dbapi_con, 'set_character_set'):
- dbapi_con.set_character_set(self.charset)
- else:
- cr.execute("SET NAMES %s" % self.charset)
- else:
- if hasattr(dbapi_con, 'set_character_set'):
- dbapi_con.set_character_set(self.charset)
- cr.execute("SET NAMES %s COLLATE %s" % (self.charset,
- self.collation))
- # let SQL errors (1064 if SET NAMES is not supported) raise
- finally:
- cr.close()
-
-
dialect = MySQLDialect
def schemadropper(self, *args, **kwargs):
return SQLiteSchemaDropper(self, *args, **kwargs)
+ def server_version_info(self, connection):
+ return self.dbapi.sqlite_version_info
+
def supports_alter(self):
return False
raise NotImplementedError()
+ def server_version_info(self, connection):
+ """Return a tuple of the database's version number."""
+
+ raise NotImplementedError()
+
def reflecttable(self, connection, table, include_columns=None):
"""Load table description from the database.
class TypesTest(AssertMixin):
"Test MySQL column types"
+ @testing.supported('mysql')
+ def test_basic(self):
+ meta1 = MetaData(testbase.db)
+ table = Table(
+ 'mysql_types', meta1,
+ Column('id', Integer, primary_key=True),
+ Column('num1', mysql.MSInteger(unsigned=True)),
+ Column('text1', mysql.MSLongText),
+ Column('text2', mysql.MSLongText()),
+ Column('num2', mysql.MSBigInteger),
+ Column('num3', mysql.MSBigInteger()),
+ Column('num4', mysql.MSDouble),
+ Column('num5', mysql.MSDouble()),
+ Column('enum1', mysql.MSEnum('"black"', '"white"')),
+ )
+ try:
+ table.drop(checkfirst=True)
+ table.create()
+ meta2 = MetaData(testbase.db)
+ t2 = Table('mysql_types', meta2, autoload=True)
+ assert isinstance(t2.c.num1.type, mysql.MSInteger)
+ assert t2.c.num1.type.unsigned
+ assert isinstance(t2.c.text1.type, mysql.MSLongText)
+ assert isinstance(t2.c.text2.type, mysql.MSLongText)
+ assert isinstance(t2.c.num2.type, mysql.MSBigInteger)
+ assert isinstance(t2.c.num3.type, mysql.MSBigInteger)
+ assert isinstance(t2.c.num4.type, mysql.MSDouble)
+ assert isinstance(t2.c.num5.type, mysql.MSDouble)
+ assert isinstance(t2.c.enum1.type, mysql.MSEnum)
+ t2.drop()
+ t2.create()
+ finally:
+ meta1.drop_all()
+
@testing.supported('mysql')
def test_numeric(self):
"Exercise type specification and options for numeric types."
numeric_table.drop()
@testing.supported('mysql')
+ @testing.exclude('mysql', '<', (4, 1, 1))
def test_charset(self):
"""Exercise CHARACTER SET and COLLATE-related options on string-type
columns."""
enum_table.drop()
@testing.supported('mysql')
+ @testing.exclude('mysql', '<', (5, 0, 0))
def test_type_reflection(self):
- # FIXME: older versions need their own test
- if testbase.db.dialect.get_version_info(testbase.db) < (5, 0):
- return
-
# (ask_for, roundtripped_as_if_different)
specs = [( String(), mysql.MSText(), ),
( String(1), mysql.MSString(1), ),
m.drop_all()
-class CharsetHelperTest(PersistTest):
- @testing.supported('mysql')
- def test_basic(self):
- if testbase.db.dialect.get_version_info(testbase.db) < (4, 1):
- return
-
- helper = mysql.MySQLCharsetOnConnect('utf8')
-
- e = create_engine(testbase.db.url, listeners=[helper])
-
- rs = e.execute("SHOW VARIABLES LIKE 'character_set%%'")
- vars = dict([(row[0], row[1]) for row in mysql._compat_fetchall(rs)])
- self.assert_(vars['character_set_client'] == 'utf8')
- self.assert_(vars['character_set_connection'] == 'utf8')
-
- helper.charset = 'latin1'
- e.pool.dispose()
- rs = e.execute("SHOW VARIABLES LIKE 'character_set%%'")
- vars = dict([(row[0], row[1]) for row in mysql._compat_fetchall(rs)])
- self.assert_(vars['character_set_client'] == 'latin1')
- self.assert_(vars['character_set_connection'] == 'latin1')
-
- helper.charset = 'utf8'
- helper.collation = 'utf8_bin'
- e.pool.dispose()
- rs = e.execute("SHOW VARIABLES LIKE 'character_set%%'")
- vars = dict([(row[0], row[1]) for row in mysql._compat_fetchall(rs)])
- self.assert_(vars['character_set_client'] == 'utf8')
- self.assert_(vars['character_set_connection'] == 'utf8')
- rs = e.execute("SHOW VARIABLES LIKE 'collation%%'")
- vars = dict([(row[0], row[1]) for row in mysql._compat_fetchall(rs)])
- self.assert_(vars['collation_connection'] == 'utf8_bin')
if __name__ == "__main__":
testbase.main()
from sqlalchemy import *
import sqlalchemy.ansisql as ansisql
from sqlalchemy.exceptions import NoSuchTableError
-import sqlalchemy.databases.mysql as mysql
from testlib import *
class ReflectionTest(PersistTest):
+
+ @testing.exclude('mysql', '<', (4, 1, 1))
def testbasic(self):
use_function_defaults = testbase.db.engine.name == 'postgres' or testbase.db.engine.name == 'oracle'
use_string_defaults = use_function_defaults or testbase.db.engine.__module__.endswith('sqlite')
- if (testbase.db.engine.name == 'mysql' and
- testbase.db.dialect.get_version_info(testbase.db) < (4, 1, 1)):
- return
-
if use_function_defaults:
defval = func.current_date()
deftype = Date
users.create()
addresses.create()
try:
- # create a join from the two tables, this ensures that
- # theres a foreign key set up
- # previously, we couldnt get foreign keys out of mysql. seems like
- # we can now as long as we use InnoDB
-# if testbase.db.engine.__module__.endswith('mysql'):
- # addresses.c.remote_user_id.append_item(ForeignKey('engine_users.user_id'))
print users
print addresses
j = join(users, addresses)
finally:
meta.drop_all()
-
- @testing.supported('mysql')
- def testmysqltypes(self):
- meta1 = MetaData(testbase.db)
- table = Table(
- 'mysql_types', meta1,
- Column('id', Integer, primary_key=True),
- Column('num1', mysql.MSInteger(unsigned=True)),
- Column('text1', mysql.MSLongText),
- Column('text2', mysql.MSLongText()),
- Column('num2', mysql.MSBigInteger),
- Column('num3', mysql.MSBigInteger()),
- Column('num4', mysql.MSDouble),
- Column('num5', mysql.MSDouble()),
- Column('enum1', mysql.MSEnum('"black"', '"white"')),
- )
- try:
- table.drop(checkfirst=True)
- table.create()
- meta2 = MetaData(testbase.db)
- t2 = Table('mysql_types', meta2, autoload=True)
- assert isinstance(t2.c.num1.type, mysql.MSInteger)
- assert t2.c.num1.type.unsigned
- assert isinstance(t2.c.text1.type, mysql.MSLongText)
- assert isinstance(t2.c.text2.type, mysql.MSLongText)
- assert isinstance(t2.c.num2.type, mysql.MSBigInteger)
- assert isinstance(t2.c.num3.type, mysql.MSBigInteger)
- assert isinstance(t2.c.num4.type, mysql.MSDouble)
- assert isinstance(t2.c.num5.type, mysql.MSDouble)
- assert isinstance(t2.c.enum1.type, mysql.MSEnum)
- t2.drop()
- t2.create()
- finally:
- table.drop(checkfirst=True)
-
def test_pks_not_uniques(self):
"""test that primary key reflection not tripped up by unique indexes"""
testbase.db.execute("drop table django_admin_log")
testbase.db.execute("drop table django_content_type")
+ @testing.exclude('mysql', '<', (4, 1, 1))
def test_composite_fk(self):
"""test reflection of composite foreign keys"""
- if (testbase.db.engine.name == 'mysql' and
- testbase.db.dialect.get_version_info(testbase.db) < (4, 1, 1)):
- return
meta = MetaData(testbase.db)
-
table = Table(
'multi', meta,
Column('multi_id', Integer, primary_key=True),
finally:
meta.drop_all()
+ @testing.exclude('mysql', '<', (4, 1, 1))
def test_to_metadata(self):
meta = MetaData()
Column('name', String(40), nullable=False),
Column('description', String(30), CheckConstraint("description='hi'")),
UniqueConstraint('name'),
- mysql_engine='InnoDB'
+ test_needs_fk=True,
)
table2 = Table('othertable', meta,
Column('id', Integer, primary_key=True),
Column('myid', Integer, ForeignKey('mytable.myid')),
- mysql_engine='InnoDB'
+ test_needs_fk=True,
)
def test_to_metadata():
return (table_c, table2_c)
def test_pickle():
- meta.connect(testbase.db)
+ meta.bind = testbase.db
meta2 = pickle.loads(pickle.dumps(meta))
assert meta2.bind is None
return (meta2.tables['mytable'], meta2.tables['othertable'])
global metadata, users
metadata = MetaData()
users = Table('users', metadata,
- Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key = True),
+ Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key=True),
Column('user_name', String(40)),
)
Column('address_id', Integer, Sequence('address_id_seq', optional=True), primary_key = True),
Column('user_id', Integer, ForeignKey(users.c.user_id)),
Column('email_address', String(40)),
-
)
orders = Table('orders', metadata,
Column('user_id', Integer, ForeignKey(users.c.user_id)),
Column('description', String(50)),
Column('isopen', Integer),
-
)
orderitems = Table('items', metadata,
Column('item_id', INT, Sequence('items_id_seq', optional=True), primary_key = True),
Column('order_id', INT, ForeignKey("orders")),
Column('item_name', VARCHAR(50)),
-
)
def test_sorter( self ):
finally:
metadata.drop_all(bind=testbase.db)
+ @testing.exclude('mysql', '<', (4, 1, 1))
def test_createdrop(self):
metadata.create_all(bind=testbase.db)
self.assertEqual( testbase.db.has_table('items'), True )
- self.assertEqual( testbase.db.has_table('email_addresses'), True )
+ self.assertEqual( testbase.db.has_table('email_addresses'), True )
metadata.create_all(bind=testbase.db)
self.assertEqual( testbase.db.has_table('items'), True )
assert len(result.fetchall()) == 0
connection.close()
+ @testing.exclude('mysql', '<', (5, 0, 3))
def testnestedrollback(self):
connection = testbase.db.connect()
connection.close()
+ @testing.exclude('mysql', '<', (5, 0, 3))
def testnesting(self):
connection = testbase.db.connect()
transaction = connection.begin()
connection.close()
@testing.unsupported('sqlite')
+ @testing.exclude('mysql', '<', (5, 0, 3))
def testnestedsubtransactionrollback(self):
connection = testbase.db.connect()
transaction = connection.begin()
connection.close()
@testing.unsupported('sqlite')
+ @testing.exclude('mysql', '<', (5, 0, 3))
def testnestedsubtransactioncommit(self):
connection = testbase.db.connect()
transaction = connection.begin()
connection.close()
@testing.unsupported('sqlite')
+ @testing.exclude('mysql', '<', (5, 0, 3))
def testrollbacktosubtransaction(self):
connection = testbase.db.connect()
transaction = connection.begin()
connection.close()
@testing.supported('postgres', 'mysql')
+ @testing.exclude('mysql', '<', (5, 0, 3))
def testtwophasetransaction(self):
connection = testbase.db.connect()
connection.close()
@testing.supported('postgres', 'mysql')
+ @testing.exclude('mysql', '<', (5, 0, 3))
def testmixedtransaction(self):
connection = testbase.db.connect()
external_connection.close()
@testing.unsupported('sqlite')
+ @testing.exclude('mysql', '<', (5, 0, 3))
def testnesting(self):
- """tests nesting of tranacstions"""
+ """tests nesting of transactions"""
external_connection = tlengine.connect()
self.assert_(external_connection.connection is not tlengine.contextual_connect().connection)
tlengine.begin()
finally:
external_connection.close()
+ @testing.exclude('mysql', '<', (5, 0, 3))
def testmixednesting(self):
"""tests nesting of transactions off the TLEngine directly inside of
tranasctions off the connection from the TLEngine"""
finally:
external_connection.close()
+ @testing.exclude('mysql', '<', (5, 0, 3))
def testmoremixednesting(self):
"""tests nesting of transactions off the connection from the TLEngine
inside of tranasctions off thbe TLEngine directly."""
finally:
external_connection.close()
+ @testing.exclude('mysql', '<', (5, 0, 3))
def testsessionnesting(self):
class User(object):
pass
assert u2.name =='jack'
assert a not in u2.addresses
+ @testing.exclude('mysql', '<', (5, 0)) # fixme
def test_unicode(self):
"""test that Query.get properly sets up the type for the bind parameter. using unicode would normally fail
on postgres, mysql and oracle unless it is converted to an encoded string"""
raise
@testing.supported('postgres', 'mysql')
+ @testing.exclude('mysql', '<', (5, 0, 3))
def test_twophase(self):
# TODO: mock up a failure condition here
# to ensure a rollback succeeds
assert users.count().scalar() == 1
assert addresses.count().scalar() == 1
-
-
def test_joined_transaction(self):
class User(object):pass
mapper(User, users)
assert len(sess.query(User).select()) == 0
@testing.supported('postgres', 'mysql')
+ @testing.exclude('mysql', '<', (5, 0, 3))
def test_nested_transaction(self):
class User(object):pass
mapper(User, users)
assert len(sess.query(User).select()) == 1
@testing.supported('postgres', 'mysql')
+ @testing.exclude('mysql', '<', (5, 0, 3))
def test_nested_autotrans(self):
class User(object):pass
mapper(User, users)
metadata.create_all()
def tearDown(self):
- t2.delete().execute()
- t1.delete().execute()
+ if metadata.tables:
+ t2.delete().execute()
+ t1.delete().execute()
def tearDownAll(self):
global unicode_bind
if testbase.db.name != 'mysql':
return testbase.db
else:
- # most mysql installations don't default to utf8 connections
- version = testbase.db.dialect.get_version_info(testbase.db)
- if version < (4, 1):
- raise AssertionError("Unicode not supported on MySQL < 4.1")
-
- c = testbase.db.connect()
- if not hasattr(c.connection.connection, 'set_character_set'):
- raise AssertionError(
- "Unicode not supported on this MySQL-python version")
- else:
- c.connection.set_character_set('utf8')
- c.detach()
-
- return c
+ from sqlalchemy.databases import mysql
+ engine = create_engine(testbase.db.url,
+ connect_args={'charset': 'utf8',
+ 'use_unicode': False})
def test_insert(self):
t1.insert().execute({u'méil':1, u'\u6e2c\u8a66':5})
t1.insert().execute({u'méil':2, u'\u6e2c\u8a66':7})
t2.insert().execute({'a':2, 'b':2})
- meta = MetaData(unicode_bind)
- tt1 = Table(t1.name, meta, autoload=True)
- tt2 = Table(t2.name, meta, autoload=True)
+ meta = MetaData(unicode_bind, reflect=True)
+ tt1 = meta.tables[t1.name]
+ tt2 = meta.tables[t2.name]
tt1.insert().execute({u'méil':1, u'\u6e2c\u8a66':5})
tt2.insert().execute({u'méil':1, u'\u6e2c\u8a66':1})
- assert tt1.select(order_by=desc(u'méil')).execute().fetchall() == [(2, 7), (1, 5)]
- assert tt2.select(order_by=desc(u'méil')).execute().fetchall() == [(2, 2), (1, 1)]
+ self.assert_(tt1.select(order_by=desc(u'méil')).execute().fetchall() ==
+ [(2, 7), (1, 5)])
+ self.assert_(tt2.select(order_by=desc(u'méil')).execute().fetchall() ==
+ [(2, 2), (1, 1)])
+ meta.drop_all()
+ metadata.create_all()
def test_mapping(self):
# TODO: this test should be moved to the ORM tests, tests should be
# monkeypatches unittest.TestLoader.suiteClass at import time
import testbase
-import unittest, re, sys, os
+import unittest, re, sys, os, operator
from cStringIO import StringIO
import testlib.config as config
sql, MetaData, clear_mappers = None, None, None
__all__ = 'PersistTest', 'AssertMixin', 'ORMTest'
+_ops = { '<': operator.lt,
+ '>': operator.gt,
+ '==': operator.eq,
+ '!=': operator.ne,
+ '<=': operator.le,
+ '>=': operator.ge,
+ 'in': operator.contains }
+
def unsupported(*dbs):
"""Mark a test as unsupported by one or more database implementations"""
return maybe
return decorate
+def exclude(db, op, spec):
+ """Mark a test as unsupported by specific database server versions.
+
+ Stackable, both with other excludes and supported/unsupported. Examples::
+ # Not supported by mydb versions less than 1, 0
+ @exclude('mydb', '<', (1,0))
+ # Other operators work too
+ @exclude('bigdb', '==', (9,0,9))
+ @exclude('yikesdb', 'in', ((0, 3, 'alpha2'), (0, 3, 'alpha3')))
+ """
+
+ def decorate(fn):
+ fn_name = fn.__name__
+ def maybe(*args, **kw):
+ if config.db.name != db:
+ return fn(*args, **kw)
+
+ have = config.db.dialect.server_version_info(
+ config.db.contextual_connect())
+
+ oper = hasattr(op, '__call__') and op or _ops[op]
+
+ if oper(have, spec):
+ print "'%s' unsupported on DB %s version '%s'" % (
+ fn_name, config.db.name, have)
+ return True
+ else:
+ return fn(*args, **kw)
+ try:
+ maybe.__name__ = fn_name
+ except:
+ pass
+ return maybe
+ return decorate
+
class TestData(object):
"""Tracks SQL expressions as they are executed via an instrumented ExecutionContext."""
if self._initTest is not None:
self._initTest.setUpAll()
except:
- result.addError(self._initTest, self.__exc_info())
- pass
+ # skip tests if global setup fails
+ ex = self.__exc_info()
+ for test in self._tests:
+ result.addError(test, ex)
+ return False
try:
return self.do_run(result)
finally: