test_schema = None
test_schema_2 = None
_current = None
-_skip_test_exception = None
+
+try:
+ from unittest import SkipTest as _skip_test_exception
+except ImportError:
+ _skip_test_exception = None
class Config(object):
def __init__(self, db, db_opts, options, file_config):
+ self._set_name(db)
self.db = db
self.db_opts = db_opts
self.options = options
self.test_schema_2 = "test_schema_2"
_stack = collections.deque()
- _configs = {}
+ _configs = set()
+
+ def _set_name(self, db):
+ if db.dialect.server_version_info:
+ svi = ".".join(str(tok) for tok in db.dialect.server_version_info)
+ self.name = "%s+%s_[%s]" % (db.name, db.driver, svi)
+ else:
+ self.name = "%s+%s" % (db.name, db.driver)
@classmethod
def register(cls, db, db_opts, options, file_config):
gets set as the "_current".
"""
cfg = Config(db, db_opts, options, file_config)
-
- cls._configs[cfg.db.name] = cfg
- cls._configs[(cfg.db.name, cfg.db.dialect)] = cfg
- cls._configs[cfg.db] = cfg
+ cls._configs.add(cfg)
return cfg
@classmethod
@classmethod
def all_configs(cls):
- for cfg in set(cls._configs.values()):
- yield cfg
+ return cls._configs
@classmethod
def all_dbs(cls):
def skip_test(msg):
raise _skip_test_exception(msg)
+
+++ /dev/null
-"""Quick and easy way to get setup.py test to run py.test without any
-custom setuptools/distutils code.
-
-"""
-import unittest
-import pytest
-
-
-class TestSuite(unittest.TestCase):
- def test_sqlalchemy(self):
- pytest.main(["-n", "4", "-q"])
help="Drop all tables in the target database first")
make_option("--backend-only", action="store_true", dest="backend_only",
help="Run only tests marked with __backend__")
+ make_option("--nomemory", action="store_true", dest="nomemory",
+ help="Don't run memory profiling tests")
make_option("--low-connections", action="store_true",
dest="low_connections",
help="Use a low number of distinct connections - "
warnings.setup_filters()
+
def _log(opt_str, value, parser):
global logging
if not logging:
options = opt
+@pre
+def _set_nomemory(opt, file_config):
+ if opt.nomemory:
+ exclude_tags.add("memory_intensive")
+
+
@pre
def _monkeypatch_cdecimal(options, file_config):
if options.cdecimal:
if not db_urls:
db_urls.append(file_config.get('db', 'default'))
+ config._current = None
for db_url in db_urls:
+
+ if options.write_idents and provision.FOLLOWER_IDENT: # != 'master':
+ with open(options.write_idents, "a") as file_:
+ file_.write(provision.FOLLOWER_IDENT + " " + db_url + "\n")
+
cfg = provision.setup_config(
db_url, options, file_config, provision.FOLLOWER_IDENT)
def generate_sub_tests(cls, module):
if getattr(cls, '__backend__', False):
for cfg in _possible_configs_for_cls(cls):
- name = "%s_%s_%s" % (cls.__name__, cfg.db.name, cfg.db.driver)
+ orig_name = cls.__name__
+
+ # we can have special chars in these names except for the
+ # pytest junit plugin, which is tripped up by the brackets
+ # and periods, so sanitize
+
+ alpha_name = re.sub('[_\[\]\.]+', '_', cfg.name)
+ alpha_name = re.sub('_+$', '', alpha_name)
+ name = "%s_%s" % (cls.__name__, alpha_name)
subcls = type(
name,
(cls, ),
{
- "__only_on__": ("%s+%s" % (cfg.db.name, cfg.db.driver)),
+ "_sa_orig_cls_name": orig_name,
+ "__only_on_config__": cfg
}
)
setattr(module, name, subcls)
config._current.reset(testing)
+def final_process_cleanup():
+ engines.testing_reaper._stop_test_ctx_aggressive()
+ assertions.global_cleanup_assertions()
+ _restore_engine()
+
+
def _setup_engine(cls):
if getattr(cls, '__engine_options__', None):
eng = engines.testing_engine(options=cls.__engine_options__)
# like a nose id, e.g.:
# "test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause"
- name = test_class.__name__
- suffix = "_%s_%s" % (config.db.name, config.db.driver)
- if name.endswith(suffix):
- name = name[0:-(len(suffix))]
+ name = getattr(test_class, '_sa_orig_cls_name', test_class.__name__)
id_ = "%s.%s.%s" % (test_module_name, name, test_name)
if not spec(config_obj):
all_configs.remove(config_obj)
+ if getattr(cls, '__only_on_config__', None):
+ all_configs.intersection_update([cls.__only_on_config__])
+
if hasattr(cls, '__requires__'):
requirements = config.requirements
for config_obj in list(all_configs):
plugin_base.configure_follower(
config.slaveinput["follower_ident"]
)
-
- if config.option.write_idents:
- with open(config.option.write_idents, "a") as file_:
- file_.write(config.slaveinput["follower_ident"] + "\n")
else:
if config.option.write_idents and \
os.path.exists(config.option.write_idents):
def pytest_sessionstart(session):
plugin_base.post_begin()
+
+def pytest_sessionfinish(session):
+ plugin_base.final_process_cleanup()
+
+
if has_xdist:
import uuid
from sqlalchemy.engine import url as sa_url
+from sqlalchemy import create_engine
from sqlalchemy import text
from sqlalchemy import exc
from sqlalchemy.util import compat
from . import config, engines
+import collections
+import os
import time
import logging
-import os
log = logging.getLogger(__name__)
FOLLOWER_IDENT = None
def create_follower_db(follower_ident):
for cfg in _configs_for_db_operation():
+ log.info("CREATE database %s, URI %r", follower_ident, cfg.db.url)
_create_db(cfg, cfg.db, follower_ident)
eng = engines.testing_engine(db_url, db_opts)
_post_configure_engine(db_url, eng, follower_ident)
eng.connect().close()
+
cfg = config.Config.register(eng, db_opts, options, file_config)
if follower_ident:
_configure_follower(cfg, follower_ident)
def drop_follower_db(follower_ident):
for cfg in _configs_for_db_operation():
+ log.info("DROP database %s, URI %r", follower_ident, cfg.db.url)
_drop_db(cfg, cfg.db, follower_ident)
except Exception:
pass
currentdb = conn.scalar("select current_database()")
- for attempt in range(3):
+ for attempt in range(10):
try:
conn.execute(
"CREATE DATABASE %s TEMPLATE %s" % (ident, currentdb))
except exc.OperationalError as err:
- if attempt != 2 and "accessed by other users" in str(err):
- time.sleep(.2)
- continue
- else:
- raise
+ if "accessed by other users" in str(err):
+ log.info(
+ "Waiting to create %s, URI %r, "
+ "template DB is in use sleeping for .5",
+ ident, eng.url)
+ time.sleep(.5)
else:
break
+ else:
+ raise err
@_create_db.for_db("mysql")
db_opts['_retry_on_12516'] = True
-def reap_oracle_dbs(eng, idents_file):
+def reap_oracle_dbs(idents_file):
log.info("Reaping Oracle dbs...")
- with eng.connect() as conn:
- with open(idents_file) as file_:
- idents = set(line.strip() for line in file_)
-
- log.info("identifiers in file: %s", ", ".join(idents))
-
- to_reap = conn.execute(
- "select u.username from all_users u where username "
- "like 'TEST_%' and not exists (select username "
- "from v$session where username=u.username)")
- all_names = set([username.lower() for (username, ) in to_reap])
- to_drop = set()
- for name in all_names:
- if name.endswith("_ts1") or name.endswith("_ts2"):
- continue
- elif name in idents:
- to_drop.add(name)
- if "%s_ts1" % name in all_names:
- to_drop.add("%s_ts1" % name)
- if "%s_ts2" % name in all_names:
- to_drop.add("%s_ts2" % name)
-
- dropped = total = 0
- for total, username in enumerate(to_drop, 1):
- if _ora_drop_ignore(conn, username):
- dropped += 1
- log.info(
- "Dropped %d out of %d stale databases detected", dropped, total)
+
+ urls = collections.defaultdict(list)
+ with open(idents_file) as file_:
+ for line in file_:
+ line = line.strip()
+ db_name, db_url = line.split(" ")
+ urls[db_url].append(db_name)
+
+ for url in urls:
+ if not url.startswith("oracle"):
+ continue
+ idents = urls[url]
+ log.info("db reaper connecting to %r", url)
+ eng = create_engine(url)
+ with eng.connect() as conn:
+
+ log.info("identifiers in file: %s", ", ".join(idents))
+
+ to_reap = conn.execute(
+ "select u.username from all_users u where username "
+ "like 'TEST_%' and not exists (select username "
+ "from v$session where username=u.username)")
+ all_names = {username.lower() for (username, ) in to_reap}
+ to_drop = set()
+ for name in all_names:
+ if name.endswith("_ts1") or name.endswith("_ts2"):
+ continue
+ elif name in idents:
+ to_drop.add(name)
+ if "%s_ts1" % name in all_names:
+ to_drop.add("%s_ts1" % name)
+ if "%s_ts2" % name in all_names:
+ to_drop.add("%s_ts2" % name)
+
+ dropped = total = 0
+ for total, username in enumerate(to_drop, 1):
+ if _ora_drop_ignore(conn, username):
+ dropped += 1
+ log.info(
+ "Dropped %d out of %d stale databases detected",
+ dropped, total)
@_follower_url_from_main.for_db("oracle")
system from dropping a database in-process.
"""
-from sqlalchemy.testing.plugin import plugin_base
-from sqlalchemy.testing import engines
from sqlalchemy.testing import provision
import logging
import sys
logging.basicConfig()
logging.getLogger(provision.__name__).setLevel(logging.INFO)
-plugin_base.read_config()
-oracle = plugin_base.file_config.get('db', 'oracle')
-from sqlalchemy.testing import provision
-
-engine = engines.testing_engine(oracle, {})
-provision.reap_oracle_dbs(engine, sys.argv[1])
+provision.reap_oracle_dbs(sys.argv[1])
def check(config):
if not against(config, 'mysql'):
return False
- row = config.db.execute("show variables like 'sql_mode'").first()
- return not row or "STRICT" not in row[1]
+ row = config.db.execute("show variables like 'sql_mode'").first()
+ return not row or "STRICT" not in row[1]
return only_if(check)
setenv=
PYTHONPATH=
PYTHONNOUSERSITE=1
- BASECOMMAND=python -m pytest
+ BASECOMMAND=python -m pytest --log-info=sqlalchemy.testing
WORKERS={env:TOX_WORKERS:-n4}
oracle: WORKERS={env:TOX_WORKERS:-n2}