--- /dev/null
+import operator
+from nose import SkipTest
+from sqlalchemy.util import decorator
+from test.bootstrap import config
+from sqlalchemy import util
+
+
+def fails_if(predicate, reason=None):
+ predicate = _as_predicate(predicate)
+
+ @decorator
+ def decorate(fn, *args, **kw):
+ if not predicate():
+ return fn(*args, **kw)
+ else:
+ try:
+ fn(*args, **kw)
+ except Exception, ex:
+ print ("'%s' failed as expected (%s): %s " % (
+ fn.__name__, predicate, str(ex)))
+ return True
+ else:
+ raise AssertionError(
+ "Unexpected success for '%s' (%s)" %
+ (fn.__name__, predicate))
+ return decorate
+
+def skip_if(predicate, reason=None):
+ predicate = _as_predicate(predicate)
+
+ @decorator
+ def decorate(fn, *args, **kw):
+ if predicate():
+ if reason:
+ msg = "'%s' : %s" % (
+ fn.__name__,
+ reason
+ )
+ else:
+ msg = "'%s': %s" % (
+ fn.__name__, predicate
+ )
+ raise SkipTest(msg)
+ else:
+ return fn(*args, **kw)
+ return decorate
+
+def only_if(predicate, reason=None):
+ predicate = _as_predicate(predicate)
+ return skip_if(NotPredicate(predicate), reason)
+
+def succeeds_if(predicate, reason=None):
+ predicate = _as_predicate(predicate)
+ return fails_if(NotPredicate(predicate), reason)
+
+class Predicate(object):
+ @classmethod
+ def as_predicate(cls, predicate):
+ if isinstance(predicate, Predicate):
+ return predicate
+ elif isinstance(predicate, list):
+ return OrPredicate([cls.as_predicate(pred) for pred in predicate])
+ elif isinstance(predicate, tuple):
+ return SpecPredicate(*predicate)
+ elif isinstance(predicate, basestring):
+ return SpecPredicate(predicate, None, None)
+ elif util.callable(predicate):
+ return LambdaPredicate(predicate)
+ else:
+ assert False, "unknown predicate type: %s" % predicate
+
+class SpecPredicate(Predicate):
+ def __init__(self, db, op=None, spec=None, description=None):
+ self.db = db
+ self.op = op
+ self.spec = spec
+ self.description = description
+
+ _ops = {
+ '<': operator.lt,
+ '>': operator.gt,
+ '==': operator.eq,
+ '!=': operator.ne,
+ '<=': operator.le,
+ '>=': operator.ge,
+ 'in': operator.contains,
+ 'between': lambda val, pair: val >= pair[0] and val <= pair[1],
+ }
+
+ def __call__(self, engine=None):
+ if engine is None:
+ engine = config.db
+
+ if "+" in self.db:
+ dialect, driver = self.db.split('+')
+ else:
+ dialect, driver = self.db, None
+
+ if dialect and engine.name != dialect:
+ return False
+ if driver is not None and engine.driver != driver:
+ return False
+
+ if self.op is not None:
+ assert driver is None, "DBAPI version specs not supported yet"
+
+ version = _server_version()
+ oper = hasattr(self.op, '__call__') and self.op \
+ or self._ops[self.op]
+ return oper(version, self.spec)
+ else:
+ return True
+
+ def _as_string(self, negate=False):
+ if self.description is not None:
+ return self.description
+ elif self.op is None:
+ if negate:
+ return "not %s" % self.db
+ else:
+ return "%s" % self.db
+ else:
+ if negate:
+ return "not %s %s %s" % (
+ self.db,
+ self.op,
+ self.spec
+ )
+ else:
+ return "%s %s %s" % (
+ self.db,
+ self.op,
+ self.spec
+ )
+
+ def __str__(self):
+ return self._as_string()
+
+class LambdaPredicate(Predicate):
+ def __init__(self, lambda_, description=None, args=None, kw=None):
+ self.lambda_ = lambda_
+ self.args = args or ()
+ self.kw = kw or {}
+ if description:
+ self.description = description
+ elif lambda_.__doc__:
+ self.description = lambda_.__doc__
+ else:
+ self.description = "custom function"
+
+ def __call__(self):
+ return self.lambda_(*self.args, **self.kw)
+
+ def _as_string(self, negate=False):
+ if negate:
+ return "not " + self.description
+ else:
+ return self.description
+
+ def __str__(self):
+ return self._as_string()
+
+class NotPredicate(Predicate):
+ def __init__(self, predicate):
+ self.predicate = predicate
+
+ def __call__(self, *arg, **kw):
+ return not self.predicate(*arg, **kw)
+
+ def __str__(self):
+ return self.predicate._as_string(True)
+
+class OrPredicate(Predicate):
+ def __init__(self, predicates, description=None):
+ self.predicates = predicates
+ self.description = description
+
+ def __call__(self, *arg, **kw):
+ for pred in self.predicates:
+ if pred(*arg, **kw):
+ self._str = pred
+ return True
+ return False
+
+ _str = None
+
+ def _eval_str(self, negate=False):
+ if self._str is None:
+ if negate:
+ conjunction = " and "
+ else:
+ conjunction = " or "
+ return conjunction.join(p._as_string(negate=negate)
+ for p in self.predicates)
+ else:
+ return self._str._as_string(negate=negate)
+
+ def _negation_str(self):
+ if self.description is not None:
+ return "Not " + (self.description % {"spec": self._str})
+ else:
+ return self._eval_str(negate=True)
+
+ def _as_string(self, negate=False):
+ if negate:
+ return self._negation_str()
+ else:
+ if self.description is not None:
+ return self.description % {"spec": self._str}
+ else:
+ return self._eval_str()
+
+ def __str__(self):
+ return self._as_string()
+
+_as_predicate = Predicate.as_predicate
+
+def _is_excluded(db, op, spec):
+ return SpecPredicate(db, op, spec)()
+
+def _server_version(bind=None):
+ """Return a server_version_info tuple."""
+
+ if bind is None:
+ bind = config.db
+
+ # force metadata to be retrieved
+ conn = bind.connect()
+ version = getattr(bind.dialect, 'server_version_info', ())
+ conn.close()
+ return version
+
+def db_spec(*dbs):
+ return OrPredicate(
+ Predicate.as_predicate(db) for db in dbs
+ )
+
+@decorator
+def future(fn, *args, **kw):
+ return fails_if(LambdaPredicate(fn, *args, **kw), "Future feature")
+
+def fails_on(db, reason):
+ return fails_if(SpecPredicate(db), reason)
+
+def fails_on_everything_except(*dbs):
+ return succeeds_if(
+ OrPredicate([
+ SpecPredicate(db) for db in dbs
+ ])
+ )
+
+def skip(db, reason):
+ return skip_if(SpecPredicate(db), reason)
+
+def only_on(dbs, reason):
+ return only_if(
+ OrPredicate([SpecPredicate(db) for db in util.to_list(dbs)])
+ )
+
+
+def exclude(db, op, spec, reason):
+ return skip_if(SpecPredicate(db, op, spec), reason)
+
+
+def against(*queries):
+ return OrPredicate([
+ Predicate.as_predicate(query)
+ for query in queries
+ ])()
"""
-from testing import \
- _block_unconditionally as no_support, \
- _chain_decorators_on, \
- exclude, \
- emits_warning_on,\
+from exclusions import \
+ skip, \
skip_if,\
+ only_if,\
only_on,\
fails_on,\
fails_on_everything_except,\
- fails_if
+ fails_if,\
+ SpecPredicate
+
+def no_support(db, reason):
+ return SpecPredicate(db, description=reason)
+
+def exclude(db, op, spec, description=None):
+ return SpecPredicate(db, op, spec, description=description)
+
from sqlalchemy import util
from test.lib import config
import testing
import sys
+crashes = skip
+
+
+def _chain_decorators_on(fn, *decorators):
+ for decorator in reversed(decorators):
+ fn = decorator(fn)
+ return fn
+
def deferrable_or_no_constraints(fn):
"""Target database must support derferable constraints."""
- return _chain_decorators_on(
- fn,
+
+ return skip_if([
no_support('firebird', 'not supported by database'),
no_support('mysql', 'not supported by database'),
no_support('mssql', 'not supported by database'),
- )
+ ])(fn)
def foreign_keys(fn):
"""Target database must support foreign keys."""
- return _chain_decorators_on(
- fn,
- no_support('sqlite', 'not supported by database'),
- )
+
+ return skip_if(
+ no_support('sqlite', 'not supported by database')
+ )(fn)
def unbounded_varchar(fn):
"""Target database must support VARCHAR with no length"""
- return _chain_decorators_on(
- fn,
- no_support('firebird', 'not supported by database'),
- no_support('oracle', 'not supported by database'),
- no_support('mysql', 'not supported by database'),
- )
+
+ return skip_if([
+ "firebird", "oracle", "mysql"
+ ], "not supported by database"
+ )(fn)
def boolean_col_expressions(fn):
"""Target database must support boolean expressions as columns"""
- return _chain_decorators_on(
- fn,
+ return skip_if([
no_support('firebird', 'not supported by database'),
no_support('oracle', 'not supported by database'),
no_support('mssql', 'not supported by database'),
no_support('sybase', 'not supported by database'),
no_support('maxdb', 'FIXME: verify not supported by database'),
no_support('informix', 'not supported by database'),
- )
+ ])(fn)
+
+def standalone_binds(fn):
+ """target database/driver supports bound parameters as column expressions
+ without being in the context of a typed column.
+
+ """
+ return skip_if(["firebird", "mssql+mxodbc"],
+ "not supported by driver")(fn)
def identity(fn):
"""Target database must support GENERATED AS IDENTITY or a facsimile.
without requiring pre-execution of a SEQUENCE or other artifact.
"""
- return _chain_decorators_on(
- fn,
- no_support('firebird', 'not supported by database'),
- no_support('oracle', 'not supported by database'),
- no_support('postgresql', 'not supported by database'),
- no_support('sybase', 'not supported by database'),
- )
+ return skip_if(["firebird", "oracle", "postgresql", "sybase"],
+ "not supported by database"
+ )(fn)
+
+def reflectable_autoincrement(fn):
+ """Target database must support tables that can automatically generate
+ PKs assuming they were reflected.
+
+ this is essentially all the DBs in "identity" plus Postgresql, which
+ has SERIAL support. FB and Oracle (and sybase?) require the Sequence to
+ be explicitly added, including if the table was reflected.
+ """
+ return skip_if(["firebird", "oracle", "sybase"],
+ "not supported by database"
+ )(fn)
+
+def binary_comparisons(fn):
+ """target database/driver can allow BLOB/BINARY fields to be compared
+ against a bound parameter value.
+ """
+ return skip_if(["oracle", "mssql"],
+ "not supported by database/driver"
+ )(fn)
def independent_cursors(fn):
- """Target must support simultaneous, independent database cursors on a single connection."""
+ """Target must support simultaneous, independent database cursors
+ on a single connection."""
- return _chain_decorators_on(
- fn,
- no_support('mssql+pyodbc', 'no driver support'),
- no_support('mssql+mxodbc', 'no driver support'),
- )
+ return skip_if(["mssql+pyodbc", "mssql+mxodbc"], "no driver support")
def independent_connections(fn):
"""Target must support simultaneous, independent database connections."""
# This is also true of some configurations of UnixODBC and probably win32
# ODBC as well.
- return _chain_decorators_on(
- fn,
- no_support('sqlite', 'Independent connections disabled when '
- ':memory: connections are used'),
- exclude('mssql', '<', (9, 0, 0),
- 'SQL Server 2005+ is required for independent connections'),
- )
+ return skip_if([
+ no_support("sqlite",
+ "independent connections disabled "
+ "when :memory: connections are used"),
+ exclude("mssql", "<", (9, 0, 0),
+ "SQL Server 2005+ is required for "
+ "independent connections"
+ )
+ ]
+ )(fn)
def updateable_autoincrement_pks(fn):
"""Target must support UPDATE on autoincrement/integer primary key."""
- return _chain_decorators_on(
- fn,
- no_support('mssql', "IDENTITY cols can't be updated"),
- no_support('sybase', "IDENTITY cols can't be updated"),
- )
+
+ return skip_if(["mssql", "sybase"],
+ "IDENTITY columns can't be updated")(fn)
def isolation_level(fn):
return _chain_decorators_on(
fn,
- only_on(('postgresql', 'sqlite', 'mysql'), "DBAPI has no isolation level support"),
+ only_on(('postgresql', 'sqlite', 'mysql'),
+ "DBAPI has no isolation level support"),
fails_on('postgresql+pypostgresql',
'pypostgresql bombs on multiple isolation level calls')
)
def row_triggers(fn):
"""Target must support standard statement-running EACH ROW triggers."""
- return _chain_decorators_on(
- fn,
+
+ return skip_if([
# no access to same table
no_support('mysql', 'requires SUPER priv'),
exclude('mysql', '<', (5, 0, 10), 'not supported by database'),
# huh? TODO: implement triggers for PG tests, remove this
- no_support('postgresql', 'PG triggers need to be implemented for tests'),
- )
+ no_support('postgresql',
+ 'PG triggers need to be implemented for tests'),
+ ])(fn)
def correlated_outer_joins(fn):
- """Target must support an outer join to a subquery which correlates to the parent."""
+ """Target must support an outer join to a subquery which
+ correlates to the parent."""
- return _chain_decorators_on(
- fn,
- no_support('oracle', 'Raises "ORA-01799: a column may not be outer-joined to a subquery"')
- )
+ return skip_if("oracle", 'Raises "ORA-01799: a column may not be '
+ 'outer-joined to a subquery"')(fn)
def update_from(fn):
"""Target must support UPDATE..FROM syntax"""
- return _chain_decorators_on(
- fn,
- only_on(('postgresql', 'mssql', 'mysql'),
- "Backend does not support UPDATE..FROM")
- )
+
+ return only_on(['postgresql', 'mssql', 'mysql'],
+ "Backend does not support UPDATE..FROM")(fn)
+
def savepoints(fn):
"""Target database must support savepoints."""
- return _chain_decorators_on(
- fn,
- no_support('access', 'savepoints not supported'),
- no_support('sqlite', 'savepoints not supported'),
- no_support('sybase', 'savepoints not supported'),
- exclude('mysql', '<', (5, 0, 3), 'savepoints not supported'),
- exclude('informix', '<', (11, 55, 'xC3'), 'savepoints not supported'),
- )
+
+ return skip_if([
+ "access",
+ "sqlite",
+ "sybase",
+ ("mysql", "<", (5, 0, 3)),
+ ("informix", "<", (11, 55, "xC3"))
+ ], "savepoints not supported")(fn)
def denormalized_names(fn):
- """Target database must have 'denormalized', i.e. UPPERCASE as case insensitive names."""
+ """Target database must have 'denormalized', i.e.
+ UPPERCASE as case insensitive names."""
return skip_if(
lambda: not testing.db.dialect.requires_name_normalize,
- "Backend does not require denomralized names."
+ "Backend does not require denormalized names."
)(fn)
def schemas(fn):
- """Target database must support external schemas, and have one named 'test_schema'."""
+ """Target database must support external schemas, and have one
+ named 'test_schema'."""
- return _chain_decorators_on(
- fn,
- no_support('sqlite', 'no schema support'),
- no_support('firebird', 'no schema support')
- )
+ return skip_if([
+ "sqlte",
+ "firebird"
+ ], "no schema support")
def sequences(fn):
"""Target database must support SEQUENCEs."""
- return _chain_decorators_on(
- fn,
- no_support('access', 'no SEQUENCE support'),
- no_support('drizzle', 'no SEQUENCE support'),
- no_support('mssql', 'no SEQUENCE support'),
- no_support('mysql', 'no SEQUENCE support'),
- no_support('sqlite', 'no SEQUENCE support'),
- no_support('sybase', 'no SEQUENCE support'),
- no_support('informix', 'no SEQUENCE support'),
- )
+
+ return only_if([
+ "postgresql", "firebird", "oracle"
+ ], "no SEQUENCE support")(fn)
def update_nowait(fn):
"""Target database must support SELECT...FOR UPDATE NOWAIT"""
- return _chain_decorators_on(
- fn,
- no_support('access', 'no FOR UPDATE NOWAIT support'),
- no_support('firebird', 'no FOR UPDATE NOWAIT support'),
- no_support('mssql', 'no FOR UPDATE NOWAIT support'),
- no_support('mysql', 'no FOR UPDATE NOWAIT support'),
- no_support('sqlite', 'no FOR UPDATE NOWAIT support'),
- no_support('sybase', 'no FOR UPDATE NOWAIT support'),
- )
+ return skip_if(["access", "firebird", "mssql", "mysql", "sqlite", "sybase"],
+ "no FOR UPDATE NOWAIT support"
+ )(fn)
def subqueries(fn):
"""Target database must support subqueries."""
- return _chain_decorators_on(
- fn,
- exclude('mysql', '<', (4, 1, 1), 'no subquery support'),
- )
+
+ return skip_if(exclude('mysql', '<', (4, 1, 1)), 'no subquery support')(fn)
def intersect(fn):
"""Target database must support INTERSECT or equivalent."""
- return _chain_decorators_on(
- fn,
- fails_on('firebird', 'no support for INTERSECT'),
- fails_on('mysql', 'no support for INTERSECT'),
- fails_on('sybase', 'no support for INTERSECT'),
- fails_on('informix', 'no support for INTERSECT'),
- )
+
+ return fails_if([
+ "firebird", "mysql", "sybase", "informix"
+ ], 'no support for INTERSECT')(fn)
def except_(fn):
"""Target database must support EXCEPT or equivalent (i.e. MINUS)."""
- return _chain_decorators_on(
- fn,
- fails_on('firebird', 'no support for EXCEPT'),
- fails_on('mysql', 'no support for EXCEPT'),
- fails_on('sybase', 'no support for EXCEPT'),
- fails_on('informix', 'no support for EXCEPT'),
- )
+ return fails_if([
+ "firebird", "mysql", "sybase", "informix"
+ ], 'no support for EXCEPT')(fn)
def offset(fn):
- """Target database must support some method of adding OFFSET or equivalent to a result set."""
- return _chain_decorators_on(
- fn,
- fails_on('sybase', 'no support for OFFSET or equivalent'),
- )
+ """Target database must support some method of adding OFFSET or
+ equivalent to a result set."""
+ return fails_if([
+ "sybase"
+ ], 'no support for OFFSET or equivalent')(fn)
def window_functions(fn):
- return _chain_decorators_on(
- fn,
- only_on(('postgresql', 'mssql', 'oracle'),
- "Backend does not support window functions"),
- )
+ return only_if([
+ "postgresql", "mssql", "oracle"
+ ], "Backend does not support window functions")(fn)
def returning(fn):
- return _chain_decorators_on(
- fn,
- no_support('access', "'returning' not supported by database"),
- no_support('sqlite', "'returning' not supported by database"),
- no_support('mysql', "'returning' not supported by database"),
- no_support('maxdb', "'returning' not supported by database"),
- no_support('sybase', "'returning' not supported by database"),
- no_support('informix', "'returning' not supported by database"),
- )
+ return only_if(["postgresql", "mssql", "oracle", "firebird"],
+ "'returning' not supported by database"
+ )(fn)
def two_phase_transactions(fn):
"""Target database must support two-phase transactions."""
- return _chain_decorators_on(
- fn,
- no_support('access', 'not supported by database'),
+
+ return skip_if([
+ no_support('access', 'two-phase xact not supported by database'),
no_support('firebird', 'no SA implementation'),
- no_support('maxdb', 'not supported by database'),
- no_support('mssql', 'FIXME: guessing, needs confirmation'),
- no_support('oracle', 'no SA implementation'),
- no_support('drizzle', 'not supported by database'),
- no_support('sqlite', 'not supported by database'),
- no_support('sybase', 'FIXME: guessing, needs confirmation'),
- no_support('postgresql+zxjdbc', 'FIXME: JDBC driver confuses the transaction state, may '
+ no_support('maxdb', 'two-phase xact not supported by database'),
+ no_support('mssql', 'two-phase xact not supported by drivers'),
+ no_support('oracle', 'two-phase xact not implemented in SQLA/oracle'),
+ no_support('drizzle', 'two-phase xact not supported by database'),
+ no_support('sqlite', 'two-phase xact not supported by database'),
+ no_support('sybase', 'two-phase xact not supported by drivers/SQLA'),
+ no_support('postgresql+zxjdbc',
+ 'FIXME: JDBC driver confuses the transaction state, may '
'need separate XA implementation'),
- exclude('mysql', '<', (5, 0, 3), 'not supported by database'),
- )
+ exclude('mysql', '<', (5, 0, 3),
+ 'two-phase xact not supported by database'),
+ ])(fn)
def views(fn):
"""Target database must support VIEWs."""
- return _chain_decorators_on(
- fn,
- no_support('drizzle', 'no VIEW support'),
- )
+
+ return skip_if("drizzle", "no VIEW support")(fn)
def unicode_connections(fn):
"""Target driver must support some encoding of Unicode across the wire."""
# TODO: expand to exclude MySQLdb versions w/ broken unicode
- return _chain_decorators_on(
- fn,
+ return skip_if([
exclude('mysql', '<', (4, 1, 1), 'no unicode connection support'),
- )
+ ])(fn)
def unicode_ddl(fn):
"""Target driver must support some encoding of Unicode across the wire."""
# TODO: expand to exclude MySQLdb versions w/ broken unicode
- return _chain_decorators_on(
- fn,
+ return skip_if([
no_support('maxdb', 'database support flakey'),
no_support('oracle', 'FIXME: no support in database?'),
no_support('sybase', 'FIXME: guessing, needs confirmation'),
no_support('mssql+pymssql', 'no FreeTDS support'),
exclude('mysql', '<', (4, 1, 1), 'no unicode connection support'),
- )
+ ])(fn)
def sane_rowcount(fn):
- return _chain_decorators_on(
- fn,
- skip_if(lambda: not testing.db.dialect.supports_sane_rowcount)
- )
+ return skip_if(
+ lambda: not testing.db.dialect.supports_sane_rowcount,
+ "driver doesn't support 'sane' rowcount"
+ )(fn)
def cextensions(fn):
- return _chain_decorators_on(
- fn,
- skip_if(lambda: not _has_cextensions(), "C extensions not installed")
- )
+ return skip_if(
+ lambda: not _has_cextensions(), "C extensions not installed"
+ )(fn)
+def emulated_lastrowid(fn):
+ """"target dialect retrieves cursor.lastrowid or an equivalent
+ after an insert() construct executes.
+ """
+ return fails_on_everything_except('mysql+mysqldb', 'mysql+oursql',
+ 'sqlite+pysqlite', 'mysql+pymysql',
+ 'mssql+pyodbc', 'mssql+mxodbc')(fn)
+
def dbapi_lastrowid(fn):
- if util.pypy:
- return _chain_decorators_on(
- fn,
- fails_if(lambda:True)
- )
- else:
- return _chain_decorators_on(
- fn,
- fails_on_everything_except('mysql+mysqldb', 'mysql+oursql',
- 'sqlite+pysqlite', 'mysql+pymysql'),
- )
+ """"target backend includes a 'lastrowid' accessor on the DBAPI
+ cursor object.
+
+ """
+ return fails_on_everything_except('mysql+mysqldb', 'mysql+oursql',
+ 'sqlite+pysqlite', 'mysql+pymysql')(fn)
def sane_multi_rowcount(fn):
- return _chain_decorators_on(
- fn,
- skip_if(lambda: not testing.db.dialect.supports_sane_multi_rowcount)
- )
+ return skip_if(
+ lambda: not testing.db.dialect.supports_sane_multi_rowcount,
+ "driver doesn't support 'sane' multi row count"
+ )
def nullsordering(fn):
"""Target backends that support nulls ordering."""
def cpython(fn):
return _chain_decorators_on(
fn,
- skip_if(lambda: util.jython or util.pypy,
+ only_if(lambda: util.cpython,
"cPython interpreter needed"
)
)
-def _has_cextensions():
- try:
- from sqlalchemy import cresultproxy, cprocessors
- return True
- except ImportError:
- return False
-
-def _has_sqlite():
- from sqlalchemy import create_engine
- try:
- e = create_engine('sqlite://')
- return True
- except ImportError:
- return False
-
-def _has_mysql_on_windows():
- return testing.against('mysql') and \
- testing.db.dialect._server_casing == 1
+def predictable_gc(fn):
+ """target platform must remove all cycles unconditionally when
+ gc.collect() is called, as well as clean out unreferenced subclasses.
-def _has_mysql_fully_case_sensitive():
- return testing.against('mysql') and \
- testing.db.dialect._server_casing == 0
+ """
+ return cpython(fn)
def sqlite(fn):
return _chain_decorators_on(
skip_if(lambda: testing.against('postgresql') \
and not testing.db.scalar('SHOW LC_COLLATE').startswith('en'))
)
+
+def selectone(fn):
+ """target driver must support the literal statement 'select 1'"""
+ return _chain_decorators_on(
+ fn,
+ skip_if(lambda: testing.against('oracle'),
+ "non-standard SELECT scalar syntax")
+ )
+
+def _has_cextensions():
+ try:
+ from sqlalchemy import cresultproxy, cprocessors
+ return True
+ except ImportError:
+ return False
+
+def _has_sqlite():
+ from sqlalchemy import create_engine
+ try:
+ e = create_engine('sqlite://')
+ return True
+ except ImportError:
+ return False
+
+def _has_mysql_on_windows():
+ return testing.against('mysql') and \
+ testing.db.dialect._detect_casing(testing.db) == 1
+
+def _has_mysql_fully_case_sensitive():
+ return testing.against('mysql') and \
+ testing.db.dialect._detect_casing(testing.db) == 0
+
"""TestCase and TestSuite artifacts and testing decorators."""
import itertools
-import operator
import re
import sys
import types
from sqlalchemy import exc as sa_exc, util, types as sqltypes, schema, \
pool, orm
from sqlalchemy.engine import default
-from nose import SkipTest
+from exclusions import db_spec, _is_excluded, fails_if, skip_if, future,\
+ fails_on, fails_on_everything_except, skip, only_on, exclude, against,\
+ _server_version
-
-_ops = { '<': operator.lt,
- '>': operator.gt,
- '==': operator.eq,
- '!=': operator.ne,
- '<=': operator.le,
- '>=': operator.ge,
- 'in': operator.contains,
- 'between': lambda val, pair: val >= pair[0] and val <= pair[1],
- }
+crashes = skip
# sugar ('testing.db'); set here by config() at runtime
db = None
# more sugar, installed by __init__
requires = None
-def fails_if(callable_, reason=None):
- """Mark a test as expected to fail if callable_ returns True.
-
- If the callable returns false, the test is run and reported as normal.
- However if the callable returns true, the test is expected to fail and the
- unit test logic is inverted: if the test fails, a success is reported. If
- the test succeeds, a failure is reported.
- """
-
- docstring = getattr(callable_, '__doc__', None) or callable_.__name__
- description = docstring.split('\n')[0]
-
- @decorator
- def decorate(fn, *args, **kw):
- if not callable_():
- return fn(*args, **kw)
- else:
- try:
- fn(*args, **kw)
- except Exception, ex:
- print ("'%s' failed as expected (condition: %s): %s " % (
- fn.__name__, description, str(ex)))
- return True
- else:
- raise AssertionError(
- "Unexpected success for '%s' (condition: %s)" %
- (fn.__name__, description))
- return decorate
-
-@decorator
-def future(fn, *args, **kw):
- """Mark a test as expected to unconditionally fail.
-
- Takes no arguments, omit parens when using as a decorator.
- """
-
- try:
- fn(*args, **kw)
- except Exception, ex:
- print ("Future test '%s' failed as expected: %s " % (
- fn.__name__, str(ex)))
- return True
- else:
- raise AssertionError(
- "Unexpected success for future test '%s'" % fn.__name__)
-
-def db_spec(*dbs):
- dialects = set([x for x in dbs if '+' not in x])
- drivers = set([x[1:] for x in dbs if x.startswith('+')])
- specs = set([tuple(x.split('+')) for x in dbs if '+' in x and x not in drivers])
-
- def check(engine):
- return engine.name in dialects or \
- engine.driver in drivers or \
- (engine.name, engine.driver) in specs
-
- return check
-
-
-def fails_on(dbs, reason):
- """Mark a test as expected to fail on the specified database
- implementation.
-
- Unlike ``crashes``, tests marked as ``fails_on`` will be run
- for the named databases. The test is expected to fail and the unit test
- logic is inverted: if the test fails, a success is reported. If the test
- succeeds, a failure is reported.
- """
-
- spec = db_spec(dbs)
-
- @decorator
- def decorate(fn, *args, **kw):
- if not spec(config.db):
- return fn(*args, **kw)
- else:
- try:
- fn(*args, **kw)
- except Exception, ex:
- print ("'%s' failed as expected on DB implementation "
- "'%s+%s': %s" % (
- fn.__name__, config.db.name, config.db.driver, reason))
- return True
- else:
- raise AssertionError(
- "Unexpected success for '%s' on DB implementation '%s+%s'" %
- (fn.__name__, config.db.name, config.db.driver))
- return decorate
-
-def fails_on_everything_except(*dbs):
- """Mark a test as expected to fail on most database implementations.
-
- Like ``fails_on``, except failure is the expected outcome on all
- databases except those listed.
- """
-
- spec = db_spec(*dbs)
-
- @decorator
- def decorate(fn, *args, **kw):
- if spec(config.db):
- return fn(*args, **kw)
- else:
- try:
- fn(*args, **kw)
- except Exception, ex:
- print ("'%s' failed as expected on DB implementation "
- "'%s+%s': %s" % (
- fn.__name__, config.db.name, config.db.driver, str(ex)))
- return True
- else:
- raise AssertionError(
- "Unexpected success for '%s' on DB implementation '%s+%s'" %
- (fn.__name__, config.db.name, config.db.driver))
- return decorate
-
-def crashes(db, reason):
- """Mark a test as unsupported by a database implementation.
-
- ``crashes`` tests will be skipped unconditionally. Use for feature tests
- that cause deadlocks or other fatal problems.
-
- """
- carp = _should_carp_about_exclusion(reason)
- spec = db_spec(db)
- @decorator
- def decorate(fn, *args, **kw):
- if spec(config.db):
- msg = "'%s' unsupported on DB implementation '%s+%s': %s" % (
- fn.__name__, config.db.name, config.db.driver, reason)
- print msg
- if carp:
- print >> sys.stderr, msg
- return True
- else:
- return fn(*args, **kw)
- return decorate
-
-def _block_unconditionally(db, reason):
- """Mark a test as unsupported by a database implementation.
-
- Will never run the test against any version of the given database, ever,
- no matter what. Use when your assumptions are infallible; past, present
- and future.
-
- """
- carp = _should_carp_about_exclusion(reason)
- spec = db_spec(db)
- @decorator
- def decorate(fn, *args, **kw):
- if spec(config.db):
- msg = "'%s' unsupported on DB implementation '%s+%s': %s" % (
- fn.__name__, config.db.name, config.db.driver, reason)
- raise SkipTest(msg)
- else:
- return fn(*args, **kw)
- return decorate
-
-def only_on(dbs, reason):
- carp = _should_carp_about_exclusion(reason)
- spec = db_spec(*util.to_list(dbs))
- @decorator
- def decorate(fn, *args, **kw):
- if spec(config.db):
- return fn(*args, **kw)
- else:
- msg = "'%s' unsupported on DB implementation '%s+%s': %s" % (
- fn.__name__, config.db.name, config.db.driver, reason)
- raise SkipTest(msg)
- return decorate
-
-def exclude(db, op, spec, reason):
- """Mark a test as unsupported by specific database server versions.
-
- Stackable, both with other excludes and other decorators. Examples::
-
- # Not supported by mydb versions less than 1, 0
- @exclude('mydb', '<', (1,0))
- # Other operators work too
- @exclude('bigdb', '==', (9,0,9))
- @exclude('yikesdb', 'in', ((0, 3, 'alpha2'), (0, 3, 'alpha3')))
-
- """
- carp = _should_carp_about_exclusion(reason)
-
- @decorator
- def decorate(fn, *args, **kw):
- if _is_excluded(db, op, spec):
- msg = "'%s' unsupported on DB %s version '%s': %s" % (
- fn.__name__, config.db.name, _server_version(), reason)
- raise SkipTest(msg)
- else:
- return fn(*args, **kw)
- return decorate
-
-def _should_carp_about_exclusion(reason):
- """Guard against forgotten exclusions."""
- assert reason
- for _ in ('todo', 'fixme', 'xxx'):
- if _ in reason.lower():
- return True
- else:
- if len(reason) < 4:
- return True
-
-def _is_excluded(db, op, spec):
- """Return True if the configured db matches an exclusion specification.
-
- db:
- A dialect name
- op:
- An operator or stringified operator, such as '=='
- spec:
- A value that will be compared to the dialect's server_version_info
- using the supplied operator.
-
- Examples::
- # Not supported by mydb versions less than 1, 0
- _is_excluded('mydb', '<', (1,0))
- # Other operators work too
- _is_excluded('bigdb', '==', (9,0,9))
- _is_excluded('yikesdb', 'in', ((0, 3, 'alpha2'), (0, 3, 'alpha3')))
- """
-
- vendor_spec = db_spec(db)
-
- if not vendor_spec(config.db):
- return False
-
- version = _server_version()
-
- oper = hasattr(op, '__call__') and op or _ops[op]
- return oper(version, spec)
-
-def _server_version(bind=None):
- """Return a server_version_info tuple."""
-
- if bind is None:
- bind = config.db
-
- # force metadata to be retrieved
- conn = bind.connect()
- version = getattr(bind.dialect, 'server_version_info', ())
- conn.close()
- return version
-
-def skip_if(predicate, reason=None):
- """Skip a test if predicate is true."""
- reason = reason or predicate.__name__
- carp = _should_carp_about_exclusion(reason)
-
- @decorator
- def decorate(fn, *args, **kw):
- if predicate():
- msg = "'%s' skipped on DB %s version '%s': %s" % (
- fn.__name__, config.db.name, _server_version(), reason)
- raise SkipTest(msg)
- else:
- return fn(*args, **kw)
- return decorate
def emits_warning(*messages):
"""Mark a test as emitting a warning.
testutil.lazy_gc()
assert not pool._refs, str(pool._refs)
-def against(*queries):
- """Boolean predicate, compares to testing database configuration.
-
- Given one or more dialect names, returns True if one is the configured
- database engine.
-
- Also supports comparison to database version when provided with one or
- more 3-tuples of dialect name, operator, and version specification::
-
- testing.against('mysql', 'postgresql')
- testing.against(('mysql', '>=', (5, 0, 0))
- """
-
- for query in queries:
- if isinstance(query, basestring):
- if db_spec(query)(config.db):
- return True
- else:
- name, op, spec = query
- if not db_spec(name)(config.db):
- continue
-
- have = _server_version()
-
- oper = hasattr(op, '__call__') and op or _ops[op]
- if oper(have, spec):
- return True
- return False
-
-def _chain_decorators_on(fn, *decorators):
- """Apply a series of decorators to fn, returning a decorated function."""
- for decorator in reversed(decorators):
- fn = decorator(fn)
- return fn
def run_as_contextmanager(ctx, fn, *arg, **kw):
"""Run the given function under the given contextmanager,