--- /dev/null
+import operator
+from nose import SkipTest
+from sqlalchemy.util import decorator
+from ..bootstrap import config
+from sqlalchemy import util
+
+
+def fails_if(predicate, reason=None):
+ predicate = _as_predicate(predicate)
+
+ @decorator
+ def decorate(fn, *args, **kw):
+ if not predicate():
+ return fn(*args, **kw)
+ else:
+ try:
+ fn(*args, **kw)
+ except Exception, ex:
+ print ("'%s' failed as expected (%s): %s " % (
+ fn.__name__, predicate, str(ex)))
+ return True
+ else:
+ raise AssertionError(
+ "Unexpected success for '%s' (%s)" %
+ (fn.__name__, predicate))
+ return decorate
+
+def skip_if(predicate, reason=None):
+ predicate = _as_predicate(predicate)
+
+ @decorator
+ def decorate(fn, *args, **kw):
+ if predicate():
+ if reason:
+ msg = "'%s' : %s" % (
+ fn.__name__,
+ reason
+ )
+ else:
+ msg = "'%s': %s" % (
+ fn.__name__, predicate
+ )
+ raise SkipTest(msg)
+ else:
+ return fn(*args, **kw)
+ return decorate
+
+def only_if(predicate, reason=None):
+ predicate = _as_predicate(predicate)
+ return skip_if(NotPredicate(predicate), reason)
+
+def succeeds_if(predicate, reason=None):
+ predicate = _as_predicate(predicate)
+ return fails_if(NotPredicate(predicate), reason)
+
+class Predicate(object):
+ @classmethod
+ def as_predicate(cls, predicate):
+ if isinstance(predicate, Predicate):
+ return predicate
+ elif isinstance(predicate, list):
+ return OrPredicate([cls.as_predicate(pred) for pred in predicate])
+ elif isinstance(predicate, tuple):
+ return SpecPredicate(*predicate)
+ elif isinstance(predicate, basestring):
+ return SpecPredicate(predicate, None, None)
+ elif util.callable(predicate):
+ return LambdaPredicate(predicate)
+ else:
+ assert False, "unknown predicate type: %s" % predicate
+
+class SpecPredicate(Predicate):
+ def __init__(self, db, op=None, spec=None, description=None):
+ self.db = db
+ self.op = op
+ self.spec = spec
+ self.description = description
+
+ _ops = {
+ '<': operator.lt,
+ '>': operator.gt,
+ '==': operator.eq,
+ '!=': operator.ne,
+ '<=': operator.le,
+ '>=': operator.ge,
+ 'in': operator.contains,
+ 'between': lambda val, pair: val >= pair[0] and val <= pair[1],
+ }
+
+ def __call__(self, engine=None):
+ if engine is None:
+ engine = config.db
+
+ if "+" in self.db:
+ dialect, driver = self.db.split('+')
+ else:
+ dialect, driver = self.db, None
+
+ if engine.name != dialect:
+ return False
+ if driver is not None and engine.driver != driver:
+ return False
+
+ if self.op is not None:
+ assert driver is None, "DBAPI version specs not supported yet"
+
+ version = _server_version()
+ oper = hasattr(self.op, '__call__') and self.op \
+ or self._ops[self.op]
+ return oper(version, self.spec)
+ else:
+ return True
+
+ def _as_string(self, negate=False):
+ if self.description is not None:
+ return self.description
+ elif self.op is None:
+ if negate:
+ return "not %s" % self.db
+ else:
+ return "%s" % self.db
+ else:
+ if negate:
+ return "not %s %s %s" % (
+ self.db,
+ self.op,
+ self.spec
+ )
+ else:
+ return "%s %s %s" % (
+ self.db,
+ self.op,
+ self.spec
+ )
+
+ def __str__(self):
+ return self._as_string()
+
+class LambdaPredicate(Predicate):
+ def __init__(self, lambda_, description=None, args=None, kw=None):
+ self.lambda_ = lambda_
+ self.args = args or ()
+ self.kw = kw or {}
+ if description:
+ self.description = description
+ elif lambda_.__doc__:
+ self.description = lambda_.__doc__
+ else:
+ self.description = "custom function"
+
+ def __call__(self):
+ return self.lambda_(*self.args, **self.kw)
+
+ def _as_string(self, negate=False):
+ if negate:
+ return "not " + self.description
+ else:
+ return self.description
+
+ def __str__(self):
+ return self._as_string()
+
+class NotPredicate(Predicate):
+ def __init__(self, predicate):
+ self.predicate = predicate
+
+ def __call__(self, *arg, **kw):
+ return not self.predicate(*arg, **kw)
+
+ def __str__(self):
+ return self.predicate._as_string(True)
+
+class OrPredicate(Predicate):
+ def __init__(self, predicates, description=None):
+ self.predicates = predicates
+ self.description = description
+
+ def __call__(self, *arg, **kw):
+ for pred in self.predicates:
+ if pred(*arg, **kw):
+ self._str = pred
+ return True
+ return False
+
+ _str = None
+
+ def _eval_str(self, negate=False):
+ if self._str is None:
+ if negate:
+ conjunction = " and "
+ else:
+ conjunction = " or "
+ return conjunction.join(p._as_string(negate=negate)
+ for p in self.predicates)
+ else:
+ return self._str._as_string(negate=negate)
+
+ def _negation_str(self):
+ if self.description is not None:
+ return "Not " + (self.description % {"spec": self._str})
+ else:
+ return self._eval_str(negate=True)
+
+ def _as_string(self, negate=False):
+ if negate:
+ return self._negation_str()
+ else:
+ if self.description is not None:
+ return self.description % {"spec": self._str}
+ else:
+ return self._eval_str()
+
+ def __str__(self):
+ return self._as_string()
+
+_as_predicate = Predicate.as_predicate
+
+def _is_excluded(db, op, spec):
+ return SpecPredicate(db, op, spec)()
+
+def _server_version(bind=None):
+ """Return a server_version_info tuple."""
+
+ if bind is None:
+ bind = config.db
+
+ # force metadata to be retrieved
+ conn = bind.connect()
+ version = getattr(bind.dialect, 'server_version_info', ())
+ conn.close()
+ return version
+
+def db_spec(*dbs):
+ return OrPredicate(
+ Predicate.as_predicate(db) for db in dbs
+ )
+
+@decorator
+def future(fn, *args, **kw):
+ return fails_if(LambdaPredicate(fn, *args, **kw), "Future feature")
+
+def fails_on(db, reason):
+ return fails_if(SpecPredicate(db), reason)
+
+def fails_on_everything_except(*dbs):
+ return succeeds_if(
+ OrPredicate([
+ SpecPredicate(db) for db in dbs
+ ])
+ )
+
+def skip(db, reason):
+ return skip_if(SpecPredicate(db), reason)
+
+def only_on(dbs, reason):
+ return only_if(
+ OrPredicate([SpecPredicate(db) for db in util.to_list(dbs)])
+ )
+
+
+def exclude(db, op, spec, reason):
+ return skip_if(SpecPredicate(db, op, spec), reason)
+
+
+def against(*queries):
+ return OrPredicate([
+ Predicate.as_predicate(query)
+ for query in queries
+ ])()
"""
-from testing import \
- _block_unconditionally as no_support, \
- _chain_decorators_on, \
- exclude, \
- emits_warning_on,\
+from exclusions import \
+ skip, \
skip_if,\
+ only_if,\
only_on,\
fails_on,\
fails_on_everything_except,\
- fails_if
+ fails_if,\
+ SpecPredicate
+
+def no_support(db, reason):
+ return SpecPredicate(db, description=reason)
+
+def exclude(db, op, spec, description=None):
+ return SpecPredicate(db, op, spec, description=description)
+
from sqlalchemy import util
from test.lib import config
import testing
import sys
+crashes = skip
+
+
+def _chain_decorators_on(fn, *decorators):
+ for decorator in reversed(decorators):
+ fn = decorator(fn)
+ return fn
+
def deferrable_or_no_constraints(fn):
"""Target database must support derferable constraints."""
- return _chain_decorators_on(
- fn,
+
+ return skip_if([
no_support('firebird', 'not supported by database'),
no_support('mysql', 'not supported by database'),
no_support('mssql', 'not supported by database'),
- )
+ ])(fn)
def foreign_keys(fn):
"""Target database must support foreign keys."""
- return _chain_decorators_on(
- fn,
- no_support('sqlite', 'not supported by database'),
- )
+
+ return skip_if(
+ no_support('sqlite', 'not supported by database')
+ )(fn)
def unbounded_varchar(fn):
"""Target database must support VARCHAR with no length"""
- return _chain_decorators_on(
- fn,
- no_support('firebird', 'not supported by database'),
- no_support('oracle', 'not supported by database'),
- no_support('mysql', 'not supported by database'),
- )
+
+ return skip_if([
+ "firebird", "oracle", "mysql"
+ ], "not supported by database"
+ )(fn)
def boolean_col_expressions(fn):
"""Target database must support boolean expressions as columns"""
- return _chain_decorators_on(
- fn,
+ return skip_if([
no_support('firebird', 'not supported by database'),
no_support('oracle', 'not supported by database'),
no_support('mssql', 'not supported by database'),
no_support('sybase', 'not supported by database'),
no_support('maxdb', 'FIXME: verify not supported by database'),
no_support('informix', 'not supported by database'),
- )
+ ])(fn)
def standalone_binds(fn):
"""target database/driver supports bound parameters as column expressions
without being in the context of a typed column.
"""
- return _chain_decorators_on(
- fn,
- no_support('firebird', 'not supported by driver'),
- no_support('mssql+mxodbc', 'not supported by driver')
- )
-
+ return skip_if(["firebird", "mssql+mxodbc"],
+ "not supported by driver")(fn)
+
def identity(fn):
"""Target database must support GENERATED AS IDENTITY or a facsimile.
without requiring pre-execution of a SEQUENCE or other artifact.
"""
- return _chain_decorators_on(
- fn,
- no_support('firebird', 'not supported by database'),
- no_support('oracle', 'not supported by database'),
- no_support('postgresql', 'not supported by database'),
- no_support('sybase', 'not supported by database'),
- )
+ return skip_if(["firebird", "oracle", "postgresql", "sybase"],
+ "not supported by database"
+ )(fn)
def reflectable_autoincrement(fn):
"""Target database must support tables that can automatically generate
has SERIAL support. FB and Oracle (and sybase?) require the Sequence to
be explicitly added, including if the table was reflected.
"""
- return _chain_decorators_on(
- fn,
- no_support('firebird', 'not supported by database'),
- no_support('oracle', 'not supported by database'),
- no_support('sybase', 'not supported by database'),
- )
+ return skip_if(["firebird", "oracle", "sybase"],
+ "not supported by database"
+ )(fn)
def binary_comparisons(fn):
"""target database/driver can allow BLOB/BINARY fields to be compared
against a bound parameter value.
"""
- return _chain_decorators_on(
- fn,
- no_support('oracle', 'not supported by database/driver'),
- no_support('mssql', 'not supported by database/driver')
- )
+ return skip_if(["oracle", "mssql"],
+ "not supported by database/driver"
+ )(fn)
def independent_cursors(fn):
- """Target must support simultaneous, independent database cursors on a single connection."""
+ """Target must support simultaneous, independent database cursors
+ on a single connection."""
- return _chain_decorators_on(
- fn,
- no_support('mssql+pyodbc', 'no driver support'),
- no_support('mssql+mxodbc', 'no driver support'),
- )
+ return skip_if(["mssql+pyodbc", "mssql+mxodbc"], "no driver support")
def independent_connections(fn):
"""Target must support simultaneous, independent database connections."""
# This is also true of some configurations of UnixODBC and probably win32
# ODBC as well.
- return _chain_decorators_on(
- fn,
- no_support('sqlite', 'Independent connections disabled when '
- ':memory: connections are used'),
- exclude('mssql', '<', (9, 0, 0),
- 'SQL Server 2005+ is required for independent connections'),
- )
+ return skip_if([
+ no_support("sqlite",
+ "independent connections disabled "
+ "when :memory: connections are used"),
+ exclude("mssql", "<", (9, 0, 0),
+ "SQL Server 2005+ is required for "
+ "independent connections"
+ )
+ ]
+ )(fn)
def updateable_autoincrement_pks(fn):
"""Target must support UPDATE on autoincrement/integer primary key."""
- return _chain_decorators_on(
- fn,
- no_support('mssql', "IDENTITY cols can't be updated"),
- no_support('sybase', "IDENTITY cols can't be updated"),
- )
+
+ return skip_if(["mssql", "sybase"],
+ "IDENTITY columns can't be updated")(fn)
def isolation_level(fn):
return _chain_decorators_on(
fn,
- only_on(('postgresql', 'sqlite', 'mysql'), "DBAPI has no isolation level support"),
+ only_on(('postgresql', 'sqlite', 'mysql'),
+ "DBAPI has no isolation level support"),
fails_on('postgresql+pypostgresql',
'pypostgresql bombs on multiple isolation level calls')
)
def row_triggers(fn):
"""Target must support standard statement-running EACH ROW triggers."""
- return _chain_decorators_on(
- fn,
+
+ return skip_if([
# no access to same table
no_support('mysql', 'requires SUPER priv'),
exclude('mysql', '<', (5, 0, 10), 'not supported by database'),
# huh? TODO: implement triggers for PG tests, remove this
- no_support('postgresql', 'PG triggers need to be implemented for tests'),
- )
+ no_support('postgresql',
+ 'PG triggers need to be implemented for tests'),
+ ])(fn)
def correlated_outer_joins(fn):
- """Target must support an outer join to a subquery which correlates to the parent."""
+ """Target must support an outer join to a subquery which
+ correlates to the parent."""
- return _chain_decorators_on(
- fn,
- no_support('oracle', 'Raises "ORA-01799: a column may not be outer-joined to a subquery"')
- )
+ return skip_if("oracle", 'Raises "ORA-01799: a column may not be '
+ 'outer-joined to a subquery"')(fn)
def update_from(fn):
"""Target must support UPDATE..FROM syntax"""
- return _chain_decorators_on(
- fn,
- only_on(('postgresql', 'mssql', 'mysql'),
- "Backend does not support UPDATE..FROM")
- )
+
+ return only_on(['postgresql', 'mssql', 'mysql'],
+ "Backend does not support UPDATE..FROM")(fn)
+
def savepoints(fn):
"""Target database must support savepoints."""
- return _chain_decorators_on(
- fn,
- no_support('access', 'savepoints not supported'),
- no_support('sqlite', 'savepoints not supported'),
- no_support('sybase', 'savepoints not supported'),
- exclude('mysql', '<', (5, 0, 3), 'savepoints not supported'),
- exclude('informix', '<', (11, 55, 'xC3'), 'savepoints not supported'),
- )
+
+ return skip_if([
+ "access",
+ "sqlite",
+ "sybase",
+ ("mysql", "<", (5, 0, 3)),
+ ("informix", "<", (11, 55, "xC3"))
+ ], "savepoints not supported")(fn)
def denormalized_names(fn):
"""Target database must have 'denormalized', i.e. UPPERCASE as case insensitive names."""
def schemas(fn):
"""Target database must support external schemas, and have one named 'test_schema'."""
- return _chain_decorators_on(
- fn,
- no_support('sqlite', 'no schema support'),
- no_support('firebird', 'no schema support')
- )
+ return skip_if([
+ "sqlte",
+ "firebird"
+ ], "no schema support")
def sequences(fn):
"""Target database must support SEQUENCEs."""
- return _chain_decorators_on(
- fn,
- no_support('access', 'no SEQUENCE support'),
- no_support('drizzle', 'no SEQUENCE support'),
- no_support('mssql', 'no SEQUENCE support'),
- no_support('mysql', 'no SEQUENCE support'),
- no_support('sqlite', 'no SEQUENCE support'),
- no_support('sybase', 'no SEQUENCE support'),
- no_support('informix', 'no SEQUENCE support'),
- )
+
+ return only_if([
+ "postgresql", "firebird", "oracle"
+ ], "no SEQUENCE support")(fn)
def update_nowait(fn):
"""Target database must support SELECT...FOR UPDATE NOWAIT"""
- return _chain_decorators_on(
- fn,
- no_support('access', 'no FOR UPDATE NOWAIT support'),
- no_support('firebird', 'no FOR UPDATE NOWAIT support'),
- no_support('mssql', 'no FOR UPDATE NOWAIT support'),
- no_support('mysql', 'no FOR UPDATE NOWAIT support'),
- no_support('sqlite', 'no FOR UPDATE NOWAIT support'),
- no_support('sybase', 'no FOR UPDATE NOWAIT support'),
- )
+ return skip_if(["access", "firebird", "mssql", "mysql", "sqlite", "sybase"],
+ "no FOR UPDATE NOWAIT support"
+ )(fn)
def subqueries(fn):
"""Target database must support subqueries."""
- return _chain_decorators_on(
- fn,
- exclude('mysql', '<', (4, 1, 1), 'no subquery support'),
- )
+
+ return skip_if(exclude('mysql', '<', (4, 1, 1)), 'no subquery support')(fn)
def intersect(fn):
"""Target database must support INTERSECT or equivalent."""
- return _chain_decorators_on(
- fn,
- fails_on('firebird', 'no support for INTERSECT'),
- fails_on('mysql', 'no support for INTERSECT'),
- fails_on('sybase', 'no support for INTERSECT'),
- fails_on('informix', 'no support for INTERSECT'),
- )
+
+ return fails_if([
+ "firebird", "mysql", "sybase", "informix"
+ ], 'no support for INTERSECT')(fn)
def except_(fn):
"""Target database must support EXCEPT or equivalent (i.e. MINUS)."""
- return _chain_decorators_on(
- fn,
- fails_on('firebird', 'no support for EXCEPT'),
- fails_on('mysql', 'no support for EXCEPT'),
- fails_on('sybase', 'no support for EXCEPT'),
- fails_on('informix', 'no support for EXCEPT'),
- )
+ return fails_if([
+ "firebird", "mysql", "sybase", "informix"
+ ], 'no support for EXCEPT')(fn)
def offset(fn):
- """Target database must support some method of adding OFFSET or equivalent to a result set."""
- return _chain_decorators_on(
- fn,
- fails_on('sybase', 'no support for OFFSET or equivalent'),
- )
+ """Target database must support some method of adding OFFSET or
+ equivalent to a result set."""
+ return fails_if([
+ "sybase"
+ ], 'no support for OFFSET or equivalent')(fn)
def window_functions(fn):
- return _chain_decorators_on(
- fn,
- only_on(('postgresql', 'mssql', 'oracle'),
- "Backend does not support window functions"),
- )
+ return only_if([
+ "postgresql", "mssql", "oracle"
+ ], "Backend does not support window functions")(fn)
def returning(fn):
- return _chain_decorators_on(
- fn,
- no_support('access', "'returning' not supported by database"),
- no_support('sqlite', "'returning' not supported by database"),
- no_support('mysql', "'returning' not supported by database"),
- no_support('maxdb', "'returning' not supported by database"),
- no_support('sybase', "'returning' not supported by database"),
- no_support('informix', "'returning' not supported by database"),
- )
+ return only_if(["postgresql", "mssql", "oracle", "firebird"],
+ "'returning' not supported by database"
+ )(fn)
def two_phase_transactions(fn):
"""Target database must support two-phase transactions."""
- return _chain_decorators_on(
- fn,
+
+ return skip_if([
no_support('access', 'not supported by database'),
no_support('firebird', 'no SA implementation'),
no_support('maxdb', 'not supported by database'),
no_support('drizzle', 'not supported by database'),
no_support('sqlite', 'not supported by database'),
no_support('sybase', 'FIXME: guessing, needs confirmation'),
- no_support('postgresql+zxjdbc', 'FIXME: JDBC driver confuses the transaction state, may '
+ no_support('postgresql+zxjdbc',
+ 'FIXME: JDBC driver confuses the transaction state, may '
'need separate XA implementation'),
exclude('mysql', '<', (5, 0, 3), 'not supported by database'),
- )
+ ])(fn)
def views(fn):
"""Target database must support VIEWs."""
- return _chain_decorators_on(
- fn,
- no_support('drizzle', 'no VIEW support'),
- )
+
+ return skip_if("drizzle", "no VIEW support")(fn)
def unicode_connections(fn):
"""Target driver must support some encoding of Unicode across the wire."""
# TODO: expand to exclude MySQLdb versions w/ broken unicode
- return _chain_decorators_on(
- fn,
+ return skip_if([
exclude('mysql', '<', (4, 1, 1), 'no unicode connection support'),
- )
+ ])(fn)
def unicode_ddl(fn):
"""Target driver must support some encoding of Unicode across the wire."""
# TODO: expand to exclude MySQLdb versions w/ broken unicode
- return _chain_decorators_on(
- fn,
+ return skip_if([
no_support('maxdb', 'database support flakey'),
no_support('oracle', 'FIXME: no support in database?'),
no_support('sybase', 'FIXME: guessing, needs confirmation'),
no_support('mssql+pymssql', 'no FreeTDS support'),
exclude('mysql', '<', (4, 1, 1), 'no unicode connection support'),
- )
+ ])(fn)
def sane_rowcount(fn):
- return _chain_decorators_on(
- fn,
- skip_if(lambda: not testing.db.dialect.supports_sane_rowcount)
- )
+ return skip_if(
+ lambda: not testing.db.dialect.supports_sane_rowcount,
+ "driver doesn't support 'sane' rowcount"
+ )(fn)
def cextensions(fn):
- return _chain_decorators_on(
- fn,
- skip_if(lambda: not _has_cextensions(), "C extensions not installed")
- )
+ return skip_if(
+ lambda: not _has_cextensions(), "C extensions not installed"
+ )(fn)
def emulated_lastrowid(fn):
""""target dialect retrieves cursor.lastrowid or an equivalent
after an insert() construct executes.
"""
- return _chain_decorators_on(
- fn,
- fails_on_everything_except('mysql+mysqldb', 'mysql+oursql',
+ return fails_on_everything_except('mysql+mysqldb', 'mysql+oursql',
'sqlite+pysqlite', 'mysql+pymysql',
- 'mssql+pyodbc', 'mssql+mxodbc'),
- )
+ 'mssql+pyodbc', 'mssql+mxodbc')(fn)
def dbapi_lastrowid(fn):
""""target backend includes a 'lastrowid' accessor on the DBAPI
cursor object.
+
"""
- return _chain_decorators_on(
- fn,
- fails_on_everything_except('mysql+mysqldb', 'mysql+oursql',
- 'sqlite+pysqlite', 'mysql+pymysql'),
- )
+ return fails_on_everything_except('mysql+mysqldb', 'mysql+oursql',
+ 'sqlite+pysqlite', 'mysql+pymysql')(fn)
def sane_multi_rowcount(fn):
- return _chain_decorators_on(
- fn,
- skip_if(lambda: not testing.db.dialect.supports_sane_multi_rowcount)
- )
+ return skip_if(
+ lambda: not testing.db.dialect.supports_sane_multi_rowcount,
+ "driver doesn't support 'sane' multi row count"
+ )
def nullsordering(fn):
"""Target backends that support nulls ordering."""
def cpython(fn):
return _chain_decorators_on(
fn,
- skip_if(lambda: util.jython or util.pypy,
+ only_if(lambda: util.cpython,
"cPython interpreter needed"
)
)
-def _has_cextensions():
- try:
- from sqlalchemy import cresultproxy, cprocessors
- return True
- except ImportError:
- return False
+def predictable_gc(fn):
+ """target platform must remove all cycles unconditionally when
+ gc.collect() is called, as well as clean out unreferenced subclasses.
-def _has_sqlite():
- from sqlalchemy import create_engine
- try:
- e = create_engine('sqlite://')
- return True
- except ImportError:
- return False
-
-def _has_mysql_on_windows():
- return testing.against('mysql') and \
- testing.db.dialect._detect_casing(testing.db) == 1
-
-def _has_mysql_fully_case_sensitive():
- return testing.against('mysql') and \
- testing.db.dialect._detect_casing(testing.db) == 0
+ """
+ return cpython(fn)
def sqlite(fn):
return _chain_decorators_on(
skip_if(lambda: testing.against('oracle'),
"non-standard SELECT scalar syntax")
)
+
+def _has_cextensions():
+ try:
+ from sqlalchemy import cresultproxy, cprocessors
+ return True
+ except ImportError:
+ return False
+
+def _has_sqlite():
+ from sqlalchemy import create_engine
+ try:
+ e = create_engine('sqlite://')
+ return True
+ except ImportError:
+ return False
+
+def _has_mysql_on_windows():
+ return testing.against('mysql') and \
+ testing.db.dialect._detect_casing(testing.db) == 1
+
+def _has_mysql_fully_case_sensitive():
+ return testing.against('mysql') and \
+ testing.db.dialect._detect_casing(testing.db) == 0
+
"""TestCase and TestSuite artifacts and testing decorators."""
import itertools
-import operator
import re
import sys
import types
from sqlalchemy import exc as sa_exc, util, types as sqltypes, schema, \
pool, orm
from sqlalchemy.engine import default
-from nose import SkipTest
+from exclusions import db_spec, _is_excluded, fails_if, skip_if, future,\
+ fails_on, fails_on_everything_except, skip, only_on, exclude, against,\
+ _server_version
-
-_ops = { '<': operator.lt,
- '>': operator.gt,
- '==': operator.eq,
- '!=': operator.ne,
- '<=': operator.le,
- '>=': operator.ge,
- 'in': operator.contains,
- 'between': lambda val, pair: val >= pair[0] and val <= pair[1],
- }
+crashes = skip
# sugar ('testing.db'); set here by config() at runtime
db = None
# more sugar, installed by __init__
requires = None
-def fails_if(callable_, reason=None):
- """Mark a test as expected to fail if callable_ returns True.
-
- If the callable returns false, the test is run and reported as normal.
- However if the callable returns true, the test is expected to fail and the
- unit test logic is inverted: if the test fails, a success is reported. If
- the test succeeds, a failure is reported.
- """
-
- docstring = getattr(callable_, '__doc__', None) or callable_.__name__
- description = docstring.split('\n')[0]
-
- @decorator
- def decorate(fn, *args, **kw):
- if not callable_():
- return fn(*args, **kw)
- else:
- try:
- fn(*args, **kw)
- except Exception, ex:
- print ("'%s' failed as expected (condition: %s): %s " % (
- fn.__name__, description, str(ex)))
- return True
- else:
- raise AssertionError(
- "Unexpected success for '%s' (condition: %s)" %
- (fn.__name__, description))
- return decorate
-
-@decorator
-def future(fn, *args, **kw):
- """Mark a test as expected to unconditionally fail.
-
- Takes no arguments, omit parens when using as a decorator.
- """
-
- try:
- fn(*args, **kw)
- except Exception, ex:
- print ("Future test '%s' failed as expected: %s " % (
- fn.__name__, str(ex)))
- return True
- else:
- raise AssertionError(
- "Unexpected success for future test '%s'" % fn.__name__)
-
-def db_spec(*dbs):
- dialects = set([x for x in dbs if '+' not in x])
- drivers = set([x[1:] for x in dbs if x.startswith('+')])
- specs = set([tuple(x.split('+')) for x in dbs if '+' in x and x not in drivers])
-
- def check(engine):
- return engine.name in dialects or \
- engine.driver in drivers or \
- (engine.name, engine.driver) in specs
-
- return check
-
-
-def fails_on(dbs, reason):
- """Mark a test as expected to fail on the specified database
- implementation.
-
- Unlike ``crashes``, tests marked as ``fails_on`` will be run
- for the named databases. The test is expected to fail and the unit test
- logic is inverted: if the test fails, a success is reported. If the test
- succeeds, a failure is reported.
- """
-
- spec = db_spec(dbs)
-
- @decorator
- def decorate(fn, *args, **kw):
- if not spec(config.db):
- return fn(*args, **kw)
- else:
- try:
- fn(*args, **kw)
- except Exception, ex:
- print ("'%s' failed as expected on DB implementation "
- "'%s+%s': %s" % (
- fn.__name__, config.db.name, config.db.driver, reason))
- return True
- else:
- raise AssertionError(
- "Unexpected success for '%s' on DB implementation '%s+%s'" %
- (fn.__name__, config.db.name, config.db.driver))
- return decorate
-
-def fails_on_everything_except(*dbs):
- """Mark a test as expected to fail on most database implementations.
-
- Like ``fails_on``, except failure is the expected outcome on all
- databases except those listed.
- """
-
- spec = db_spec(*dbs)
-
- @decorator
- def decorate(fn, *args, **kw):
- if spec(config.db):
- return fn(*args, **kw)
- else:
- try:
- fn(*args, **kw)
- except Exception, ex:
- print ("'%s' failed as expected on DB implementation "
- "'%s+%s': %s" % (
- fn.__name__, config.db.name, config.db.driver, str(ex)))
- return True
- else:
- raise AssertionError(
- "Unexpected success for '%s' on DB implementation '%s+%s'" %
- (fn.__name__, config.db.name, config.db.driver))
- return decorate
-
-def crashes(db, reason):
- """Mark a test as unsupported by a database implementation.
-
- ``crashes`` tests will be skipped unconditionally. Use for feature tests
- that cause deadlocks or other fatal problems.
-
- """
- carp = _should_carp_about_exclusion(reason)
- spec = db_spec(db)
- @decorator
- def decorate(fn, *args, **kw):
- if spec(config.db):
- msg = "'%s' unsupported on DB implementation '%s+%s': %s" % (
- fn.__name__, config.db.name, config.db.driver, reason)
- print msg
- if carp:
- print >> sys.stderr, msg
- return True
- else:
- return fn(*args, **kw)
- return decorate
-
-def _block_unconditionally(db, reason):
- """Mark a test as unsupported by a database implementation.
-
- Will never run the test against any version of the given database, ever,
- no matter what. Use when your assumptions are infallible; past, present
- and future.
-
- """
- carp = _should_carp_about_exclusion(reason)
- spec = db_spec(db)
- @decorator
- def decorate(fn, *args, **kw):
- if spec(config.db):
- msg = "'%s' unsupported on DB implementation '%s+%s': %s" % (
- fn.__name__, config.db.name, config.db.driver, reason)
- raise SkipTest(msg)
- else:
- return fn(*args, **kw)
- return decorate
-
-def only_on(dbs, reason):
- carp = _should_carp_about_exclusion(reason)
- spec = db_spec(*util.to_list(dbs))
- @decorator
- def decorate(fn, *args, **kw):
- if spec(config.db):
- return fn(*args, **kw)
- else:
- msg = "'%s' unsupported on DB implementation '%s+%s': %s" % (
- fn.__name__, config.db.name, config.db.driver, reason)
- raise SkipTest(msg)
- return decorate
-
-def exclude(db, op, spec, reason):
- """Mark a test as unsupported by specific database server versions.
-
- Stackable, both with other excludes and other decorators. Examples::
-
- # Not supported by mydb versions less than 1, 0
- @exclude('mydb', '<', (1,0))
- # Other operators work too
- @exclude('bigdb', '==', (9,0,9))
- @exclude('yikesdb', 'in', ((0, 3, 'alpha2'), (0, 3, 'alpha3')))
-
- """
- carp = _should_carp_about_exclusion(reason)
-
- @decorator
- def decorate(fn, *args, **kw):
- if _is_excluded(db, op, spec):
- msg = "'%s' unsupported on DB %s version '%s': %s" % (
- fn.__name__, config.db.name, _server_version(), reason)
- raise SkipTest(msg)
- else:
- return fn(*args, **kw)
- return decorate
-
-def _should_carp_about_exclusion(reason):
- """Guard against forgotten exclusions."""
- assert reason
- for _ in ('todo', 'fixme', 'xxx'):
- if _ in reason.lower():
- return True
- else:
- if len(reason) < 4:
- return True
-
-def _is_excluded(db, op, spec):
- """Return True if the configured db matches an exclusion specification.
-
- db:
- A dialect name
- op:
- An operator or stringified operator, such as '=='
- spec:
- A value that will be compared to the dialect's server_version_info
- using the supplied operator.
-
- Examples::
- # Not supported by mydb versions less than 1, 0
- _is_excluded('mydb', '<', (1,0))
- # Other operators work too
- _is_excluded('bigdb', '==', (9,0,9))
- _is_excluded('yikesdb', 'in', ((0, 3, 'alpha2'), (0, 3, 'alpha3')))
- """
-
- vendor_spec = db_spec(db)
-
- if not vendor_spec(config.db):
- return False
-
- version = _server_version()
-
- oper = hasattr(op, '__call__') and op or _ops[op]
- return oper(version, spec)
-
-def _server_version(bind=None):
- """Return a server_version_info tuple."""
-
- if bind is None:
- bind = config.db
-
- # force metadata to be retrieved
- conn = bind.connect()
- version = getattr(bind.dialect, 'server_version_info', ())
- conn.close()
- return version
-
-def skip_if(predicate, reason=None):
- """Skip a test if predicate is true."""
- reason = reason or predicate.__name__
- carp = _should_carp_about_exclusion(reason)
-
- @decorator
- def decorate(fn, *args, **kw):
- if predicate():
- msg = "'%s' skipped on DB %s version '%s': %s" % (
- fn.__name__, config.db.name, _server_version(), reason)
- raise SkipTest(msg)
- else:
- return fn(*args, **kw)
- return decorate
def emits_warning(*messages):
"""Mark a test as emitting a warning.
testutil.lazy_gc()
assert not pool._refs, str(pool._refs)
-def against(*queries):
- """Boolean predicate, compares to testing database configuration.
-
- Given one or more dialect names, returns True if one is the configured
- database engine.
-
- Also supports comparison to database version when provided with one or
- more 3-tuples of dialect name, operator, and version specification::
-
- testing.against('mysql', 'postgresql')
- testing.against(('mysql', '>=', (5, 0, 0))
- """
-
- for query in queries:
- if isinstance(query, basestring):
- if db_spec(query)(config.db):
- return True
- else:
- name, op, spec = query
- if not db_spec(name)(config.db):
- continue
-
- have = _server_version()
-
- oper = hasattr(op, '__call__') and op or _ops[op]
- if oper(have, spec):
- return True
- return False
-
-def _chain_decorators_on(fn, *decorators):
- """Apply a series of decorators to fn, returning a decorated function."""
- for decorator in reversed(decorators):
- fn = decorator(fn)
- return fn
def run_as_contextmanager(ctx, fn, *arg, **kw):
"""Run the given function under the given contextmanager,