"full" - runs a full py.test
- "coverage" - runs a full py.test plus coverage, minus memusage
-
- "lightweight" - runs tests without the very heavy "memusage" tests, without
- coverage. Suitable running tests against pypy and for parallel testing.
-
- "memusage" - runs only the memusage tests (very slow and heavy)
+ "coverage" - runs a py.test plus coverage, skipping memory/timing
+ intensive tests
"pep8" - runs flake8 against the codebase (useful with --diff to check
against a patch)
After installing pytest-xdist, testing is run adding the -n<num> option.
For example, to run against sqlite, mysql, postgresql with four processes::
- tox -e lightweight -- -n 4 --db sqlite --db postgresql --db mysql
+ tox -e -- -n 4 --exclude-tags memory-intensive --db sqlite --db postgresql --db mysql
Each backend has a different scheme for setting up the database. Postgresql
still needs the "test_schema" and "test_schema_2" schemas present, as the
@classmethod
def set_as_current(cls, config, namespace):
- global db, _current, db_url, test_schema, test_schema_2
+ global db, _current, db_url, test_schema, test_schema_2, db_opts
_current = config
db_url = config.db.url
+ db_opts = config.db_opts
test_schema = config.test_schema
test_schema_2 = config.test_schema_2
namespace.db = db = config.db
def __init__(self):
self.fails = set()
self.skips = set()
+ self.tags = set()
def __add__(self, other):
return self.add(other)
copy = compound()
copy.fails.update(self.fails)
copy.skips.update(self.skips)
+ copy.tags.update(self.tags)
for other in others:
copy.fails.update(other.fails)
copy.skips.update(other.skips)
+ copy.tags.update(other.tags)
return copy
def not_(self):
copy = compound()
copy.fails.update(NotPredicate(fail) for fail in self.fails)
copy.skips.update(NotPredicate(skip) for skip in self.skips)
+ copy.tags.update(self.tags)
return copy
@property
if predicate(config)
]
+ def include_test(self, include_tags, exclude_tags):
+ return bool(
+ not self.tags.intersection(exclude_tags) and
+ (not include_tags or self.tags.intersection(include_tags))
+ )
+
+ def _extend(self, other):
+ self.skips.update(other.skips)
+ self.fails.update(other.fails)
+ self.tags.update(other.tags)
+
def __call__(self, fn):
if hasattr(fn, '_sa_exclusion_extend'):
- fn._sa_exclusion_extend(self)
+ fn._sa_exclusion_extend._extend(self)
return fn
- def extend(other):
- self.skips.update(other.skips)
- self.fails.update(other.fails)
-
@decorator
def decorate(fn, *args, **kw):
return self._do(config._current, fn, *args, **kw)
decorated = decorate(fn)
- decorated._sa_exclusion_extend = extend
+ decorated._sa_exclusion_extend = self
return decorated
-
@contextlib.contextmanager
def fail_if(self):
all_fails = compound()
)
+def requires_tag(tagname):
+ return tags([tagname])
+
+
+def tags(tagnames):
+ comp = compound()
+ comp.tags.update(tagnames)
+ return comp
+
+
def only_if(predicate, reason=None):
predicate = _as_predicate(predicate)
return skip_if(NotPredicate(predicate), reason)
from nose.plugins import Plugin
fixtures = None
+py3k = sys.version_info >= (3, 0)
# no package imports yet! this prevents us from tripping coverage
# too soon.
path = os.path.join(os.path.dirname(__file__), "plugin_base.py")
return ""
def wantFunction(self, fn):
- if fn.__module__ is None:
- return False
- if fn.__module__.startswith('sqlalchemy.testing'):
- return False
+ return False
+
+ def wantMethod(self, fn):
+ if py3k:
+ cls = fn.__self__.cls
+ else:
+ cls = fn.im_class
+ return plugin_base.want_method(cls, fn)
def wantClass(self, cls):
return plugin_base.want_class(cls)
logging = None
db_opts = {}
+include_tags = set()
+exclude_tags = set()
options = None
dest="cdecimal", default=False,
help="Monkeypatch the cdecimal library into Python 'decimal' "
"for all tests")
- make_option("--serverside", action="callback",
- callback=_server_side_cursors,
+ make_option("--include-tag", action="callback", callback=_include_tag,
+ type="string",
+ help="Include tests with tag <tag>")
+ make_option("--exclude-tag", action="callback", callback=_exclude_tag,
+ type="string",
+ help="Exclude tests with tag <tag>")
+ make_option("--serverside", action="store_true",
help="Turn on server side cursors for PG")
make_option("--mysql-engine", action="store",
dest="mysql_engine", default=None,
def configure_follower(follower_ident):
+ """Configure required state for a follower.
+
+ This invokes in the parent process and typically includes
+ database creation.
+
+ """
global FOLLOWER_IDENT
FOLLOWER_IDENT = follower_ident
+def memoize_important_follower_config(dict_):
+ """Store important configuration we will need to send to a follower.
+
+ This invokes in the parent process after normal config is set up.
+
+ This is necessary as py.test seems to not be using forking, so we
+ start with nothing in memory, *but* it isn't running our argparse
+ callables, so we have to just copy all of that over.
+
+ """
+ dict_['memoized_config'] = {
+ 'db_opts': db_opts,
+ 'include_tags': include_tags,
+ 'exclude_tags': exclude_tags
+ }
+
+
+def restore_important_follower_config(dict_):
+ """Restore important configuration needed by a follower.
+
+ This invokes in the follower process.
+
+ """
+ global db_opts, include_tags, exclude_tags
+ db_opts.update(dict_['memoized_config']['db_opts'])
+ include_tags.update(dict_['memoized_config']['include_tags'])
+ exclude_tags.update(dict_['memoized_config']['exclude_tags'])
+ print "EXCLUDE TAGS!!!!!", exclude_tags
+
+
def read_config():
global file_config
file_config = configparser.ConfigParser()
from sqlalchemy import util
-
def _log(opt_str, value, parser):
global logging
if not logging:
sys.exit(0)
-def _server_side_cursors(opt_str, value, parser):
- db_opts['server_side_cursors'] = True
-
-
def _requirements_opt(opt_str, value, parser):
_setup_requirements(value)
+def _exclude_tag(opt_str, value, parser):
+ exclude_tags.add(value.replace('-', '_'))
+
+
+def _include_tag(opt_str, value, parser):
+ include_tags.add(value.replace('-', '_'))
+
pre_configure = []
post_configure = []
return fn
-
@pre
def _setup_options(opt, file_config):
global options
options = opt
+@pre
+def _server_side_cursors(options, file_config):
+ if options.serverside:
+ db_opts['server_side_cursors'] = True
+
+
@pre
def _monkeypatch_cdecimal(options, file_config):
if options.cdecimal:
@post
def _engine_uri(options, file_config):
- from sqlalchemy.testing import engines, config
+ from sqlalchemy.testing import config
from sqlalchemy import testing
+ from sqlalchemy.testing.plugin import provision
if options.dburi:
db_urls = list(options.dburi)
if not db_urls:
db_urls.append(file_config.get('db', 'default'))
- from . import provision
-
for db_url in db_urls:
cfg = provision.setup_config(
db_url, db_opts, options, file_config, FOLLOWER_IDENT)
if not config._current:
cfg.set_as_current(cfg, testing)
- config.db_opts = db_opts
-
-
-
@post
def _engine_pool(options, file_config):
return True
+def want_method(cls, fn):
+ if cls.__name__ == 'PoolFirstConnectSyncTest' and fn.__name__ == 'test_sync':
+ assert exclude_tags
+ assert hasattr(fn, '_sa_exclusion_extend')
+ assert not fn._sa_exclusion_extend.include_test(include_tags, exclude_tags)
+
+ if fn.__module__ is None:
+ return False
+ elif fn.__module__.startswith('sqlalchemy.testing'):
+ return False
+ elif include_tags:
+ return (
+ hasattr(cls, '__tags__') and
+ exclusions.tags(cls.__tags__).include_test(
+ include_tags, exclude_tags)
+ ) or (
+ hasattr(fn, '_sa_exclusion_extend') and
+ fn._sa_exclusion_extend.include_test(
+ include_tags, exclude_tags)
+ )
+ elif exclude_tags and hasattr(cls, '__tags__'):
+ return exclusions.tags(cls.__tags__).include_test(
+ include_tags, exclude_tags)
+ elif exclude_tags and hasattr(fn, '_sa_exclusion_extend'):
+ return fn._sa_exclusion_extend.include_test(include_tags, exclude_tags)
+ else:
+ return fn.__name__.startswith("test_")
+
+
def generate_sub_tests(cls, module):
if getattr(cls, '__backend__', False):
for cfg in _possible_configs_for_cls(cls):
def _possible_configs_for_cls(cls, reasons=None):
all_configs = set(config.Config.all_configs())
+
if cls.__unsupported_on__:
spec = exclusions.db_spec(*cls.__unsupported_on__)
for config_obj in list(all_configs):
if spec(config_obj):
all_configs.remove(config_obj)
+
if getattr(cls, '__only_on__', None):
spec = exclusions.db_spec(*util.to_list(cls.__only_on__))
for config_obj in list(all_configs):
if all_configs.difference(non_preferred):
all_configs.difference_update(non_preferred)
- for db_spec, op, spec in getattr(cls, '__excluded_on__', ()):
- for config_obj in list(all_configs):
- if not exclusions.skip_if(
- exclusions.SpecPredicate(db_spec, op, spec)
- ).enabled_for_config(config_obj):
- all_configs.remove(config_obj)
-
return all_configs
def pytest_configure(config):
if hasattr(config, "slaveinput"):
+ plugin_base.restore_important_follower_config(config.slaveinput)
plugin_base.configure_follower(
config.slaveinput["follower_ident"]
)
def pytest_configure_node(node):
# the master for each node fills slaveinput dictionary
# which pytest-xdist will transfer to the subprocess
+
+ plugin_base.memoize_important_follower_config(node.slaveinput)
+
node.slaveinput["follower_ident"] = "test_%s" % next(_follower_count)
from . import provision
provision.create_follower_db(node.slaveinput["follower_ident"])
def pytest_pycollect_makeitem(collector, name, obj):
-
if inspect.isclass(obj) and plugin_base.want_class(obj):
return pytest.Class(name, parent=collector)
elif inspect.isfunction(obj) and \
- name.startswith("test_") and \
- isinstance(collector, pytest.Instance):
+ isinstance(collector, pytest.Instance) and \
+ plugin_base.want_method(collector.cls, obj):
return pytest.Function(name, parent=collector)
else:
return []
"""
from . import exclusions
+from .. import util
class Requirements(object):
return exclusions.skip_if(
lambda config: config.options.low_connections)
+ @property
+ def timing_intensive(self):
+ return exclusions.requires_tag("timing_intensive")
+
+ @property
+ def memory_intensive(self):
+ return exclusions.requires_tag("memory_intensive")
+
+ @property
+ def threading_with_mock(self):
+ """Mark tests that use threading and mock at the same time - stability
+ issues have been observed with coverage + python 3.3
+
+ """
+ return exclusions.skip_if(
+ lambda config: util.py3k and config.options.has_coverage,
+ "Stability issues with coverage + py3k"
+ )
+
+ @property
+ def no_coverage(self):
+ """Test should be skipped if coverage is enabled.
+
+ This is to block tests that exercise libraries that seem to be
+ sensitive to coverage, such as Postgresql notice logging.
+
+ """
+ return exclusions.skip_if(
+ lambda config: config.options.has_coverage,
+ "Issues observed when coverage is enabled"
+ )
+
def _has_mysql_on_windows(self, config):
return False
class MemUsageTest(EnsureZeroed):
+ __tags__ = 'memory_intensive',
__requires__ = 'cpython',
__backend__ = True
# currently not passing with pg 9.3 that does not seem to generate
# any notices here, would rather find a way to mock this
+ @testing.requires.no_coverage
@testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
def _test_notice_logging(self):
log = logging.getLogger('sqlalchemy.dialects.postgresql')
"""test DDL and reflection of PG-specific types """
- __only_on__ = 'postgresql'
- __excluded_on__ = (('postgresql', '<', (8, 3, 0)),)
+ __only_on__ = 'postgresql >= 8.3.0',
__backend__ = True
@classmethod
class PoolFirstConnectSyncTest(PoolTestBase):
# test [ticket:2964]
+ @testing.requires.timing_intensive
def test_sync(self):
pool = self._queuepool_fixture(pool_size=3, max_overflow=0)
max_overflow=-1)
def status(pool):
- tup = pool.size(), pool.checkedin(), pool.overflow(), \
+ return pool.size(), pool.checkedin(), pool.overflow(), \
pool.checkedout()
- print('Pool size: %d Connections in pool: %d Current '\
- 'Overflow: %d Current Checked out connections: %d' % tup)
- return tup
c1 = p.connect()
self.assert_(status(p) == (3, 0, -2, 1))
lazy_gc()
assert not pool._refs
+ @testing.requires.timing_intensive
def test_timeout(self):
p = self._queuepool_fixture(pool_size=3,
max_overflow=0,
assert int(time.time() - now) == 2
@testing.requires.threading_with_mock
+ @testing.requires.timing_intensive
def test_timeout_race(self):
# test a race condition where the initial connecting threads all race
# to queue.Empty, then block on the mutex. each thread consumes a
eq_(p._overflow, 1)
@testing.requires.threading_with_mock
+ @testing.requires.timing_intensive
def test_hanging_connect_within_overflow(self):
"""test that a single connect() call which is hanging
does not block other connections from proceeding."""
@testing.requires.threading_with_mock
+ @testing.requires.timing_intensive
def test_waiters_handled(self):
"""test that threads waiting for connections are
handled when the pool is replaced.
eq_(len(success), 12, "successes: %s" % success)
@testing.requires.threading_with_mock
+ @testing.requires.timing_intensive
def test_notify_waiters(self):
dbapi = MockDBAPI()
assert c3.connection is c2_con
@testing.requires.threading_with_mock
+ @testing.requires.timing_intensive
def test_no_overflow(self):
self._test_overflow(40, 0)
@testing.requires.threading_with_mock
+ @testing.requires.timing_intensive
def test_max_overflow(self):
self._test_overflow(40, 5)
c3 = p.connect()
assert id(c3.connection) != c_id
+ @testing.requires.timing_intensive
def test_recycle_on_invalidate(self):
p = self._queuepool_fixture(pool_size=1,
max_overflow=0)
c1.close()
self._assert_cleanup_on_pooled_reconnect(dbapi, p)
+ @testing.requires.timing_intensive
def test_error_on_pooled_reconnect_cleanup_recycle(self):
dbapi, p = self._queuepool_dbapi_fixture(pool_size=1,
max_overflow=2, recycle=1)
c1 = p.connect()
c1.close()
- time.sleep(1)
+ time.sleep(1.5)
self._assert_cleanup_on_pooled_reconnect(dbapi, p)
+ @testing.requires.timing_intensive
def test_recycle_pool_no_race(self):
def slow_close():
slow_closing_connection._slow_close()
succeeds_if,\
SpecPredicate,\
against,\
- LambdaPredicate
+ LambdaPredicate,\
+ requires_tag
def no_support(db, reason):
return SpecPredicate(db, description=reason)
"Not supported on MySQL + Windows"
)
- @property
- def threading_with_mock(self):
- """Mark tests that use threading and mock at the same time - stability
- issues have been observed with coverage + python 3.3
-
- """
- return skip_if(
- lambda config: util.py3k and
- config.options.has_coverage,
- "Stability issues with coverage + py3k"
- )
@property
def selectone(self):
class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
- __excluded_on__ = (
- ('mysql', '<', (4, 1, 1)), # screwy varbinary types
- )
@classmethod
def setup_class(cls):
[tox]
-envlist = coverage, full, lightweight, memusage
+envlist = full
[testenv]
deps=pytest
[testenv:full]
-[testenv:memusage]
-commands=
- python -m pytest test/aaa_profiling/test_memusage.py {posargs}
-
-[testenv:lightweight]
-commands=
- python -m pytest -k "not memusage" {posargs}
-
[testenv:coverage]
commands=
python -m pytest \
--cov=lib/sqlalchemy \
- -k "not memusage" \
+ --exclude-tag memory-intensive \
+ --exclude-tag timing-intensive \
{posargs}
python -m coverage xml --include=lib/sqlalchemy/*