logging = None
-db_opts = {}
include_tags = set()
exclude_tags = set()
options = None
help="Drop all tables in the target database first")
make_option("--backend-only", action="store_true", dest="backend_only",
help="Run only tests marked with __backend__")
- make_option("--mockpool", action="store_true", dest="mockpool",
- help="Use mock pool (asserts only one connection used)")
make_option("--low-connections", action="store_true",
dest="low_connections",
help="Use a low number of distinct connections - "
make_option("--exclude-tag", action="callback", callback=_exclude_tag,
type="string",
help="Exclude tests with tag <tag>")
- make_option("--serverside", action="store_true",
- help="Turn on server side cursors for PG")
make_option("--mysql-engine", action="store",
dest="mysql_engine", default=None,
help="Use the specified MySQL storage engine for all tables, "
"""
dict_['memoized_config'] = {
- 'db_opts': db_opts,
'include_tags': include_tags,
'exclude_tags': exclude_tags
}
This invokes in the follower process.
"""
- db_opts.update(dict_['memoized_config']['db_opts'])
include_tags.update(dict_['memoized_config']['include_tags'])
exclude_tags.update(dict_['memoized_config']['exclude_tags'])
options = opt
-@pre
-def _server_side_cursors(options, file_config):
- if options.serverside:
- db_opts['server_side_cursors'] = True
-
@pre
def _monkeypatch_cdecimal(options, file_config):
for db_url in db_urls:
cfg = provision.setup_config(
- db_url, db_opts, options, file_config, provision.FOLLOWER_IDENT)
+ db_url, options, file_config, provision.FOLLOWER_IDENT)
if not config._current:
cfg.set_as_current(cfg)
-@post
-def _engine_pool(options, file_config):
- if options.mockpool:
- from sqlalchemy import pool
- db_opts['poolclass'] = pool.AssertionPool
-
-
@post
def _requirements(options, file_config):
from ..util import compat
from . import config, engines
from .compat import get_url_backend_name
+import os
+import time
+import logging
+log = logging.getLogger(__name__)
FOLLOWER_IDENT = None
_configure_follower(cfg, follower_ident)
-def setup_config(db_url, db_opts, options, file_config, follower_ident):
+def setup_config(db_url, options, file_config, follower_ident):
if follower_ident:
db_url = _follower_url_from_main(db_url, follower_ident)
+ db_opts = {}
+ _update_db_opts(db_url, db_opts)
eng = engines.testing_engine(db_url, db_opts)
+ _post_configure_engine(db_url, eng, follower_ident)
eng.connect().close()
cfg = config.Config.register(eng, db_opts, options, file_config)
if follower_ident:
raise NotImplementedError("no DB drop routine for cfg: %s" % eng.url)
+@register.init
+def _update_db_opts(db_url, db_opts):
+ pass
+
+
@register.init
def _configure_follower(cfg, ident):
pass
+@register.init
+def _post_configure_engine(url, engine, follower_ident):
+ pass
+
+
@register.init
def _follower_url_from_main(url, ident):
url = sa_url.make_url(url)
return url
+@_update_db_opts.for_db("mssql")
+def _mssql_update_db_opts(db_url, db_opts):
+ db_opts['legacy_schema_aliasing'] = False
+
+
@_follower_url_from_main.for_db("sqlite")
def _sqlite_follower_url_from_main(url, ident):
url = sa_url.make_url(url)
return sa_url.make_url("sqlite:///%s.db" % ident)
+@_post_configure_engine.for_db("sqlite")
+def _sqlite_post_configure_engine(url, engine, follower_ident):
+ from sqlalchemy import event
+
+ @event.listens_for(engine, "connect")
+ def connect(dbapi_connection, connection_record):
+ # use file DBs in all cases, memory acts kind of strangely
+ # as an attached
+ if not follower_ident:
+ dbapi_connection.execute(
+ 'ATTACH DATABASE "test_schema.db" AS test_schema')
+ else:
+ dbapi_connection.execute(
+ 'ATTACH DATABASE "%s_test_schema.db" AS test_schema'
+ % follower_ident)
+
+
@_create_db.for_db("postgresql")
def _pg_create_db(cfg, eng, ident):
with eng.connect().execution_options(
isolation_level="AUTOCOMMIT") as conn:
try:
_pg_drop_db(cfg, conn, ident)
- except:
+ except Exception:
pass
currentdb = conn.scalar("select current_database()")
- conn.execute("CREATE DATABASE %s TEMPLATE %s" % (ident, currentdb))
+ for attempt in range(3):
+ try:
+ conn.execute(
+ "CREATE DATABASE %s TEMPLATE %s" % (ident, currentdb))
+ except exc.OperationalError as err:
+ if attempt != 2 and "accessed by other users" in str(err):
+ time.sleep(.2)
+ continue
+ else:
+ raise
+ else:
+ break
@_create_db.for_db("mysql")
with eng.connect() as conn:
try:
_mysql_drop_db(cfg, conn, ident)
- except:
+ except Exception:
pass
conn.execute("CREATE DATABASE %s" % ident)
conn.execute("CREATE DATABASE %s_test_schema" % ident)
@_drop_db.for_db("sqlite")
def _sqlite_drop_db(cfg, eng, ident):
- pass
- #os.remove("%s.db" % ident)
+ if ident:
+ os.remove("%s_test_schema.db" % ident)
+ else:
+ os.remove("%s.db" % ident)
@_drop_db.for_db("mysql")
with eng.connect() as conn:
try:
conn.execute("DROP DATABASE %s_test_schema" % ident)
- except:
+ except Exception:
pass
try:
conn.execute("DROP DATABASE %s_test_schema_2" % ident)
- except:
+ except Exception:
pass
try:
conn.execute("DROP DATABASE %s" % ident)
- except:
+ except Exception:
pass
+
+
+@_create_db.for_db("oracle")
+def _oracle_create_db(cfg, eng, ident):
+ # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or
+ # similar, so that the default tablespace is not "system"; reflection will
+ # fail otherwise
+ with eng.connect() as conn:
+ conn.execute("create user %s identified by xe" % ident)
+ conn.execute("create user %s_ts1 identified by xe" % ident)
+ conn.execute("create user %s_ts2 identified by xe" % ident)
+ conn.execute("grant dba to %s" % (ident, ))
+ conn.execute("grant unlimited tablespace to %s" % ident)
+ conn.execute("grant unlimited tablespace to %s_ts1" % ident)
+ conn.execute("grant unlimited tablespace to %s_ts2" % ident)
+
+@_configure_follower.for_db("oracle")
+def _oracle_configure_follower(config, ident):
+ config.test_schema = "%s_ts1" % ident
+ config.test_schema_2 = "%s_ts2" % ident
+
+
+def _ora_drop_ignore(conn, dbname):
+ try:
+ conn.execute("drop user %s cascade" % dbname)
+ log.info("Reaped db: %s" % dbname)
+ return True
+ except exc.DatabaseError as err:
+ log.warn("couldn't drop db: %s" % err)
+ return False
+
+
+@_drop_db.for_db("oracle")
+def _oracle_drop_db(cfg, eng, ident):
+ with eng.connect() as conn:
+ # cx_Oracle seems to occasionally leak open connections when a large
+ # suite it run, even if we confirm we have zero references to
+ # connection objects.
+ # while there is a "kill session" command in Oracle,
+ # it unfortunately does not release the connection sufficiently.
+ _ora_drop_ignore(conn, ident)
+ _ora_drop_ignore(conn, "%s_ts1" % ident)
+ _ora_drop_ignore(conn, "%s_ts2" % ident)
+
+
+def reap_oracle_dbs(eng):
+ log.info("Reaping Oracle dbs...")
+ with eng.connect() as conn:
+ to_reap = conn.execute(
+ "select u.username from all_users u where username "
+ "like 'TEST_%' and not exists (select username "
+ "from v$session where username=u.username)")
+ all_names = set([username.lower() for (username, ) in to_reap])
+ to_drop = set()
+ for name in all_names:
+ if name.endswith("_ts1") or name.endswith("_ts2"):
+ continue
+ else:
+ to_drop.add(name)
+ if "%s_ts1" % name in all_names:
+ to_drop.add("%s_ts1" % name)
+ if "%s_ts2" % name in all_names:
+ to_drop.add("%s_ts2" % name)
+
+ dropped = total = 0
+ for total, username in enumerate(to_drop, 1):
+ if _ora_drop_ignore(conn, username):
+ dropped += 1
+ log.info(
+ "Dropped %d out of %d stale databases detected", dropped, total)
+
+
+@_follower_url_from_main.for_db("oracle")
+def _oracle_follower_url_from_main(url, ident):
+ url = sa_url.make_url(url)
+ url.username = ident
+ url.password = 'xe'
+ return url
+
+