From: Mike Bayer Date: Thu, 3 Mar 2016 14:14:55 +0000 (-0500) Subject: - updates from SQLA 1.1 for improved concurrency X-Git-Tag: rel_0_8_5~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=cd44b77fa711aa79cc1d1ae08e019c072ebb9487;p=thirdparty%2Fsqlalchemy%2Falembic.git - updates from SQLA 1.1 for improved concurrency --- diff --git a/alembic/testing/plugin/plugin_base.py b/alembic/testing/plugin/plugin_base.py index 6383961a..1449b276 100644 --- a/alembic/testing/plugin/plugin_base.py +++ b/alembic/testing/plugin/plugin_base.py @@ -48,7 +48,6 @@ file_config = None logging = None -db_opts = {} include_tags = set() exclude_tags = set() options = None @@ -72,8 +71,6 @@ def setup_options(make_option): help="Drop all tables in the target database first") make_option("--backend-only", action="store_true", dest="backend_only", help="Run only tests marked with __backend__") - make_option("--mockpool", action="store_true", dest="mockpool", - help="Use mock pool (asserts only one connection used)") make_option("--low-connections", action="store_true", dest="low_connections", help="Use a low number of distinct connections - " @@ -95,8 +92,6 @@ def setup_options(make_option): make_option("--exclude-tag", action="callback", callback=_exclude_tag, type="string", help="Exclude tests with tag ") - make_option("--serverside", action="store_true", - help="Turn on server side cursors for PG") make_option("--mysql-engine", action="store", dest="mysql_engine", default=None, help="Use the specified MySQL storage engine for all tables, " @@ -125,7 +120,6 @@ def memoize_important_follower_config(dict_): """ dict_['memoized_config'] = { - 'db_opts': db_opts, 'include_tags': include_tags, 'exclude_tags': exclude_tags } @@ -137,7 +131,6 @@ def restore_important_follower_config(dict_): This invokes in the follower process. """ - db_opts.update(dict_['memoized_config']['db_opts']) include_tags.update(dict_['memoized_config']['include_tags']) exclude_tags.update(dict_['memoized_config']['exclude_tags']) @@ -228,11 +221,6 @@ def _setup_options(opt, file_config): options = opt -@pre -def _server_side_cursors(options, file_config): - if options.serverside: - db_opts['server_side_cursors'] = True - @pre def _monkeypatch_cdecimal(options, file_config): @@ -267,19 +255,12 @@ def _engine_uri(options, file_config): for db_url in db_urls: cfg = provision.setup_config( - db_url, db_opts, options, file_config, provision.FOLLOWER_IDENT) + db_url, options, file_config, provision.FOLLOWER_IDENT) if not config._current: cfg.set_as_current(cfg) -@post -def _engine_pool(options, file_config): - if options.mockpool: - from sqlalchemy import pool - db_opts['poolclass'] = pool.AssertionPool - - @post def _requirements(options, file_config): diff --git a/alembic/testing/provision.py b/alembic/testing/provision.py index 37ae1410..4ca40cf1 100644 --- a/alembic/testing/provision.py +++ b/alembic/testing/provision.py @@ -6,6 +6,10 @@ from sqlalchemy import text from ..util import compat from . import config, engines from .compat import get_url_backend_name +import os +import time +import logging +log = logging.getLogger(__name__) FOLLOWER_IDENT = None @@ -49,10 +53,13 @@ def configure_follower(follower_ident): _configure_follower(cfg, follower_ident) -def setup_config(db_url, db_opts, options, file_config, follower_ident): +def setup_config(db_url, options, file_config, follower_ident): if follower_ident: db_url = _follower_url_from_main(db_url, follower_ident) + db_opts = {} + _update_db_opts(db_url, db_opts) eng = engines.testing_engine(db_url, db_opts) + _post_configure_engine(db_url, eng, follower_ident) eng.connect().close() cfg = config.Config.register(eng, db_opts, options, file_config) if follower_ident: @@ -96,11 +103,21 @@ def _drop_db(cfg, eng, ident): raise NotImplementedError("no DB drop routine for cfg: %s" % eng.url) +@register.init +def _update_db_opts(db_url, db_opts): + pass + + @register.init def _configure_follower(cfg, ident): pass +@register.init +def _post_configure_engine(url, engine, follower_ident): + pass + + @register.init def _follower_url_from_main(url, ident): url = sa_url.make_url(url) @@ -108,6 +125,11 @@ def _follower_url_from_main(url, ident): return url +@_update_db_opts.for_db("mssql") +def _mssql_update_db_opts(db_url, db_opts): + db_opts['legacy_schema_aliasing'] = False + + @_follower_url_from_main.for_db("sqlite") def _sqlite_follower_url_from_main(url, ident): url = sa_url.make_url(url) @@ -117,16 +139,44 @@ def _sqlite_follower_url_from_main(url, ident): return sa_url.make_url("sqlite:///%s.db" % ident) +@_post_configure_engine.for_db("sqlite") +def _sqlite_post_configure_engine(url, engine, follower_ident): + from sqlalchemy import event + + @event.listens_for(engine, "connect") + def connect(dbapi_connection, connection_record): + # use file DBs in all cases, memory acts kind of strangely + # as an attached + if not follower_ident: + dbapi_connection.execute( + 'ATTACH DATABASE "test_schema.db" AS test_schema') + else: + dbapi_connection.execute( + 'ATTACH DATABASE "%s_test_schema.db" AS test_schema' + % follower_ident) + + @_create_db.for_db("postgresql") def _pg_create_db(cfg, eng, ident): with eng.connect().execution_options( isolation_level="AUTOCOMMIT") as conn: try: _pg_drop_db(cfg, conn, ident) - except: + except Exception: pass currentdb = conn.scalar("select current_database()") - conn.execute("CREATE DATABASE %s TEMPLATE %s" % (ident, currentdb)) + for attempt in range(3): + try: + conn.execute( + "CREATE DATABASE %s TEMPLATE %s" % (ident, currentdb)) + except exc.OperationalError as err: + if attempt != 2 and "accessed by other users" in str(err): + time.sleep(.2) + continue + else: + raise + else: + break @_create_db.for_db("mysql") @@ -134,7 +184,7 @@ def _mysql_create_db(cfg, eng, ident): with eng.connect() as conn: try: _mysql_drop_db(cfg, conn, ident) - except: + except Exception: pass conn.execute("CREATE DATABASE %s" % ident) conn.execute("CREATE DATABASE %s_test_schema" % ident) @@ -167,8 +217,10 @@ def _pg_drop_db(cfg, eng, ident): @_drop_db.for_db("sqlite") def _sqlite_drop_db(cfg, eng, ident): - pass - #os.remove("%s.db" % ident) + if ident: + os.remove("%s_test_schema.db" % ident) + else: + os.remove("%s.db" % ident) @_drop_db.for_db("mysql") @@ -176,13 +228,93 @@ def _mysql_drop_db(cfg, eng, ident): with eng.connect() as conn: try: conn.execute("DROP DATABASE %s_test_schema" % ident) - except: + except Exception: pass try: conn.execute("DROP DATABASE %s_test_schema_2" % ident) - except: + except Exception: pass try: conn.execute("DROP DATABASE %s" % ident) - except: + except Exception: pass + + +@_create_db.for_db("oracle") +def _oracle_create_db(cfg, eng, ident): + # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or + # similar, so that the default tablespace is not "system"; reflection will + # fail otherwise + with eng.connect() as conn: + conn.execute("create user %s identified by xe" % ident) + conn.execute("create user %s_ts1 identified by xe" % ident) + conn.execute("create user %s_ts2 identified by xe" % ident) + conn.execute("grant dba to %s" % (ident, )) + conn.execute("grant unlimited tablespace to %s" % ident) + conn.execute("grant unlimited tablespace to %s_ts1" % ident) + conn.execute("grant unlimited tablespace to %s_ts2" % ident) + +@_configure_follower.for_db("oracle") +def _oracle_configure_follower(config, ident): + config.test_schema = "%s_ts1" % ident + config.test_schema_2 = "%s_ts2" % ident + + +def _ora_drop_ignore(conn, dbname): + try: + conn.execute("drop user %s cascade" % dbname) + log.info("Reaped db: %s" % dbname) + return True + except exc.DatabaseError as err: + log.warn("couldn't drop db: %s" % err) + return False + + +@_drop_db.for_db("oracle") +def _oracle_drop_db(cfg, eng, ident): + with eng.connect() as conn: + # cx_Oracle seems to occasionally leak open connections when a large + # suite it run, even if we confirm we have zero references to + # connection objects. + # while there is a "kill session" command in Oracle, + # it unfortunately does not release the connection sufficiently. + _ora_drop_ignore(conn, ident) + _ora_drop_ignore(conn, "%s_ts1" % ident) + _ora_drop_ignore(conn, "%s_ts2" % ident) + + +def reap_oracle_dbs(eng): + log.info("Reaping Oracle dbs...") + with eng.connect() as conn: + to_reap = conn.execute( + "select u.username from all_users u where username " + "like 'TEST_%' and not exists (select username " + "from v$session where username=u.username)") + all_names = set([username.lower() for (username, ) in to_reap]) + to_drop = set() + for name in all_names: + if name.endswith("_ts1") or name.endswith("_ts2"): + continue + else: + to_drop.add(name) + if "%s_ts1" % name in all_names: + to_drop.add("%s_ts1" % name) + if "%s_ts2" % name in all_names: + to_drop.add("%s_ts2" % name) + + dropped = total = 0 + for total, username in enumerate(to_drop, 1): + if _ora_drop_ignore(conn, username): + dropped += 1 + log.info( + "Dropped %d out of %d stale databases detected", dropped, total) + + +@_follower_url_from_main.for_db("oracle") +def _oracle_follower_url_from_main(url, ident): + url = sa_url.make_url(url) + url.username = ident + url.password = 'xe' + return url + + diff --git a/reap_oracle_dbs.py b/reap_oracle_dbs.py new file mode 100644 index 00000000..ff638a01 --- /dev/null +++ b/reap_oracle_dbs.py @@ -0,0 +1,24 @@ +"""Drop Oracle databases that are left over from a +multiprocessing test run. + +Currently the cx_Oracle driver seems to sometimes not release a +TCP connection even if close() is called, which prevents the provisioning +system from dropping a database in-process. + +""" +from sqlalchemy.testing.plugin import plugin_base +from sqlalchemy.testing import engines +from sqlalchemy.testing import provision +import logging + +logging.basicConfig() +logging.getLogger(provision.__name__).setLevel(logging.INFO) + +plugin_base.read_config() +oracle = plugin_base.file_config.get('db', 'oracle') +from sqlalchemy.testing import provision + +engine = engines.testing_engine(oracle, {}) +provision.reap_oracle_dbs(engine) + + diff --git a/tox.ini b/tox.ini index c8226e91..15593d21 100644 --- a/tox.ini +++ b/tox.ini @@ -44,10 +44,7 @@ passenv=ORACLE_HOME NLS_LANG commands= {env:BASECOMMAND} {env:WORKERS} {env:SQLITE:} {env:POSTGRESQL:} {env:MYSQL:} {env:ORACLE:} {env:MSSQL:} {env:BACKENDONLY:} {env:COVERAGE:} {posargs} - -# note: oracle will need newer provisioning logic for oracle to work -# -# {oracle}: python reap_oracle_dbs.py + {oracle}: python reap_oracle_dbs.py [testenv:pep8]