class Config(object):
def __init__(self, db, db_opts, options, file_config):
+ self._set_name(db)
self.db = db
self.db_opts = db_opts
self.options = options
self.test_schema_2 = "test_schema_2"
_stack = collections.deque()
- _configs = {}
+ _configs = set()
+
+ def _set_name(self, db):
+ if db.dialect.server_version_info:
+ svi = ".".join(str(tok) for tok in db.dialect.server_version_info)
+ self.name = "%s+%s_[%s]" % (db.name, db.driver, svi)
+ else:
+ self.name = "%s+%s" % (db.name, db.driver)
@classmethod
def register(cls, db, db_opts, options, file_config):
gets set as the "_current".
"""
cfg = Config(db, db_opts, options, file_config)
-
- cls._configs[cfg.db.name] = cfg
- cls._configs[(cfg.db.name, cfg.db.dialect)] = cfg
- cls._configs[cfg.db] = cfg
+ cls._configs.add(cfg)
return cfg
@classmethod
@classmethod
def all_configs(cls):
- for cfg in set(cls._configs.values()):
- yield cfg
+ return cls._configs
@classmethod
def all_dbs(cls):
db_urls.append(file_config.get('db', 'default'))
for db_url in db_urls:
+
+ if options.write_idents and provision.FOLLOWER_IDENT: # != 'master':
+ with open(options.write_idents, "a") as file_:
+ file_.write(provision.FOLLOWER_IDENT + " " + db_url + "\n")
+
cfg = provision.setup_config(
db_url, options, file_config, provision.FOLLOWER_IDENT)
def generate_sub_tests(cls, module):
if getattr(cls, '__backend__', False):
for cfg in _possible_configs_for_cls(cls):
- name = "%s_%s_%s" % (cls.__name__, cfg.db.name, cfg.db.driver)
+ orig_name = cls.__name__
+ name = "%s_%s" % (cls.__name__, cfg.name)
subcls = type(
name,
(cls, ),
{
- "__only_on__": ("%s+%s" % (cfg.db.name, cfg.db.driver)),
+ "_sa_orig_cls_name": orig_name,
+ "__only_on_config__": cfg
}
)
setattr(module, name, subcls)
if not spec(config_obj):
all_configs.remove(config_obj)
+ if getattr(cls, '__only_on_config__', None):
+ all_configs.intersection_update([cls.__only_on_config__])
+
if hasattr(cls, '__requires__'):
requirements = config.requirements
for config_obj in list(all_configs):
plugin_base.configure_follower(
config.slaveinput["follower_ident"]
)
- if config.option.write_idents:
- with open(config.option.write_idents, "a") as file_:
- file_.write(config.slaveinput["follower_ident"] + "\n")
else:
if config.option.write_idents and \
os.path.exists(config.option.write_idents):
os.remove(config.option.write_idents)
-
plugin_base.pre_begin(config.option)
- coverage = bool(getattr(config.option, "cov_source", False))
- plugin_base.set_coverage_flag(coverage)
+ plugin_base.set_coverage_flag(bool(getattr(config.option,
+ "cov_source", False)))
def pytest_sessionstart(session):
this should be removable when Alembic targets SQLAlchemy 1.0.0
"""
from sqlalchemy.engine import url as sa_url
+from sqlalchemy import create_engine
from sqlalchemy import text
from sqlalchemy import exc
from ..util import compat
from . import config, engines
from .compat import get_url_backend_name
+import collections
import os
import time
import logging
+
log = logging.getLogger(__name__)
FOLLOWER_IDENT = None
eng = engines.testing_engine(db_url, db_opts)
_post_configure_engine(db_url, eng, follower_ident)
eng.connect().close()
+
cfg = config.Config.register(eng, db_opts, options, file_config)
if follower_ident:
_configure_follower(cfg, follower_ident)
_ora_drop_ignore(conn, "%s_ts2" % ident)
-def reap_oracle_dbs(eng, idents_file):
+def reap_oracle_dbs(idents_file):
log.info("Reaping Oracle dbs...")
- with eng.connect() as conn:
- with open(idents_file) as file_:
- idents = set(line.strip() for line in file_)
-
- log.info("identifiers in file: %s", ", ".join(idents))
-
- to_reap = conn.execute(
- "select u.username from all_users u where username "
- "like 'TEST_%' and not exists (select username "
- "from v$session where username=u.username)")
- all_names = set([username.lower() for (username, ) in to_reap])
- to_drop = set()
- for name in all_names:
- if name.endswith("_ts1") or name.endswith("_ts2"):
- continue
- elif name in idents:
- to_drop.add(name)
- if "%s_ts1" % name in all_names:
- to_drop.add("%s_ts1" % name)
- if "%s_ts2" % name in all_names:
- to_drop.add("%s_ts2" % name)
-
- dropped = total = 0
- for total, username in enumerate(to_drop, 1):
- if _ora_drop_ignore(conn, username):
- dropped += 1
- log.info(
- "Dropped %d out of %d stale databases detected", dropped, total)
+
+ urls = collections.defaultdict(list)
+ with open(idents_file) as file_:
+ for line in file_:
+ line = line.strip()
+ db_name, db_url = line.split(" ")
+ urls[db_url].append(db_name)
+
+ for url in urls:
+ idents = urls[url]
+ log.info("db reaper connecting to %r", url)
+ eng = create_engine(url)
+ with eng.connect() as conn:
+
+ log.info("identifiers in file: %s", ", ".join(idents))
+
+ to_reap = conn.execute(
+ "select u.username from all_users u where username "
+ "like 'TEST_%' and not exists (select username "
+ "from v$session where username=u.username)")
+ all_names = {username.lower() for (username, ) in to_reap}
+ to_drop = set()
+ for name in all_names:
+ if name.endswith("_ts1") or name.endswith("_ts2"):
+ continue
+ elif name in idents:
+ to_drop.add(name)
+ if "%s_ts1" % name in all_names:
+ to_drop.add("%s_ts1" % name)
+ if "%s_ts2" % name in all_names:
+ to_drop.add("%s_ts2" % name)
+
+ dropped = total = 0
+ for total, username in enumerate(to_drop, 1):
+ if _ora_drop_ignore(conn, username):
+ dropped += 1
+ log.info(
+ "Dropped %d out of %d stale databases detected",
+ dropped, total)
@_follower_url_from_main.for_db("oracle")
system from dropping a database in-process.
"""
-from alembic.testing.plugin import plugin_base
-from alembic.testing import engines
from alembic.testing import provision
import logging
import sys
logging.basicConfig()
logging.getLogger(provision.__name__).setLevel(logging.INFO)
-plugin_base.read_config()
-oracle = plugin_base.file_config.get('db', 'oracle')
-
-engine = engines.testing_engine(oracle, {})
-provision.reap_oracle_dbs(engine, sys.argv[1])
+provision.reap_oracle_dbs(sys.argv[1])