]> git.ipfire.org Git - thirdparty/sqlalchemy/alembic.git/commitdiff
- updates from SQLA 1.1 for improved concurrency
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 3 Mar 2016 14:14:55 +0000 (09:14 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 3 Mar 2016 14:14:55 +0000 (09:14 -0500)
alembic/testing/plugin/plugin_base.py
alembic/testing/provision.py
reap_oracle_dbs.py [new file with mode: 0644]
tox.ini

index 6383961a5dcecd634ecb70eab7a04ad31b0187db..1449b27677842a0a55a5fdd11f52de5e0fb6cdd0 100644 (file)
@@ -48,7 +48,6 @@ file_config = None
 
 
 logging = None
-db_opts = {}
 include_tags = set()
 exclude_tags = set()
 options = None
@@ -72,8 +71,6 @@ def setup_options(make_option):
                 help="Drop all tables in the target database first")
     make_option("--backend-only", action="store_true", dest="backend_only",
                 help="Run only tests marked with __backend__")
-    make_option("--mockpool", action="store_true", dest="mockpool",
-                help="Use mock pool (asserts only one connection used)")
     make_option("--low-connections", action="store_true",
                 dest="low_connections",
                 help="Use a low number of distinct connections - "
@@ -95,8 +92,6 @@ def setup_options(make_option):
     make_option("--exclude-tag", action="callback", callback=_exclude_tag,
                 type="string",
                 help="Exclude tests with tag <tag>")
-    make_option("--serverside", action="store_true",
-                help="Turn on server side cursors for PG")
     make_option("--mysql-engine", action="store",
                 dest="mysql_engine", default=None,
                 help="Use the specified MySQL storage engine for all tables, "
@@ -125,7 +120,6 @@ def memoize_important_follower_config(dict_):
 
     """
     dict_['memoized_config'] = {
-        'db_opts': db_opts,
         'include_tags': include_tags,
         'exclude_tags': exclude_tags
     }
@@ -137,7 +131,6 @@ def restore_important_follower_config(dict_):
     This invokes in the follower process.
 
     """
-    db_opts.update(dict_['memoized_config']['db_opts'])
     include_tags.update(dict_['memoized_config']['include_tags'])
     exclude_tags.update(dict_['memoized_config']['exclude_tags'])
 
@@ -228,11 +221,6 @@ def _setup_options(opt, file_config):
     options = opt
 
 
-@pre
-def _server_side_cursors(options, file_config):
-    if options.serverside:
-        db_opts['server_side_cursors'] = True
-
 
 @pre
 def _monkeypatch_cdecimal(options, file_config):
@@ -267,19 +255,12 @@ def _engine_uri(options, file_config):
 
     for db_url in db_urls:
         cfg = provision.setup_config(
-            db_url, db_opts, options, file_config, provision.FOLLOWER_IDENT)
+            db_url, options, file_config, provision.FOLLOWER_IDENT)
 
         if not config._current:
             cfg.set_as_current(cfg)
 
 
-@post
-def _engine_pool(options, file_config):
-    if options.mockpool:
-        from sqlalchemy import pool
-        db_opts['poolclass'] = pool.AssertionPool
-
-
 @post
 def _requirements(options, file_config):
 
index 37ae1410408ccdce3978a8425663a69cd7fcaab0..4ca40cf1f24676e508a5b5942f12435e6cbfada8 100644 (file)
@@ -6,6 +6,10 @@ from sqlalchemy import text
 from ..util import compat
 from . import config, engines
 from .compat import get_url_backend_name
+import os
+import time
+import logging
+log = logging.getLogger(__name__)
 
 FOLLOWER_IDENT = None
 
@@ -49,10 +53,13 @@ def configure_follower(follower_ident):
         _configure_follower(cfg, follower_ident)
 
 
-def setup_config(db_url, db_opts, options, file_config, follower_ident):
+def setup_config(db_url, options, file_config, follower_ident):
     if follower_ident:
         db_url = _follower_url_from_main(db_url, follower_ident)
+    db_opts = {}
+    _update_db_opts(db_url, db_opts)
     eng = engines.testing_engine(db_url, db_opts)
+    _post_configure_engine(db_url, eng, follower_ident)
     eng.connect().close()
     cfg = config.Config.register(eng, db_opts, options, file_config)
     if follower_ident:
@@ -96,11 +103,21 @@ def _drop_db(cfg, eng, ident):
     raise NotImplementedError("no DB drop routine for cfg: %s" % eng.url)
 
 
+@register.init
+def _update_db_opts(db_url, db_opts):
+    pass
+
+
 @register.init
 def _configure_follower(cfg, ident):
     pass
 
 
+@register.init
+def _post_configure_engine(url, engine, follower_ident):
+    pass
+
+
 @register.init
 def _follower_url_from_main(url, ident):
     url = sa_url.make_url(url)
@@ -108,6 +125,11 @@ def _follower_url_from_main(url, ident):
     return url
 
 
+@_update_db_opts.for_db("mssql")
+def _mssql_update_db_opts(db_url, db_opts):
+    db_opts['legacy_schema_aliasing'] = False
+
+
 @_follower_url_from_main.for_db("sqlite")
 def _sqlite_follower_url_from_main(url, ident):
     url = sa_url.make_url(url)
@@ -117,16 +139,44 @@ def _sqlite_follower_url_from_main(url, ident):
         return sa_url.make_url("sqlite:///%s.db" % ident)
 
 
+@_post_configure_engine.for_db("sqlite")
+def _sqlite_post_configure_engine(url, engine, follower_ident):
+    from sqlalchemy import event
+
+    @event.listens_for(engine, "connect")
+    def connect(dbapi_connection, connection_record):
+        # use file DBs in all cases, memory acts kind of strangely
+        # as an attached
+        if not follower_ident:
+            dbapi_connection.execute(
+                'ATTACH DATABASE "test_schema.db" AS test_schema')
+        else:
+            dbapi_connection.execute(
+                'ATTACH DATABASE "%s_test_schema.db" AS test_schema'
+                % follower_ident)
+
+
 @_create_db.for_db("postgresql")
 def _pg_create_db(cfg, eng, ident):
     with eng.connect().execution_options(
             isolation_level="AUTOCOMMIT") as conn:
         try:
             _pg_drop_db(cfg, conn, ident)
-        except:
+        except Exception:
             pass
         currentdb = conn.scalar("select current_database()")
-        conn.execute("CREATE DATABASE %s TEMPLATE %s" % (ident, currentdb))
+        for attempt in range(3):
+            try:
+                conn.execute(
+                    "CREATE DATABASE %s TEMPLATE %s" % (ident, currentdb))
+            except exc.OperationalError as err:
+                if attempt != 2 and "accessed by other users" in str(err):
+                    time.sleep(.2)
+                    continue
+                else:
+                    raise
+            else:
+                break
 
 
 @_create_db.for_db("mysql")
@@ -134,7 +184,7 @@ def _mysql_create_db(cfg, eng, ident):
     with eng.connect() as conn:
         try:
             _mysql_drop_db(cfg, conn, ident)
-        except:
+        except Exception:
             pass
         conn.execute("CREATE DATABASE %s" % ident)
         conn.execute("CREATE DATABASE %s_test_schema" % ident)
@@ -167,8 +217,10 @@ def _pg_drop_db(cfg, eng, ident):
 
 @_drop_db.for_db("sqlite")
 def _sqlite_drop_db(cfg, eng, ident):
-    pass
-    #os.remove("%s.db" % ident)
+    if ident:
+        os.remove("%s_test_schema.db" % ident)
+    else:
+        os.remove("%s.db" % ident)
 
 
 @_drop_db.for_db("mysql")
@@ -176,13 +228,93 @@ def _mysql_drop_db(cfg, eng, ident):
     with eng.connect() as conn:
         try:
             conn.execute("DROP DATABASE %s_test_schema" % ident)
-        except:
+        except Exception:
             pass
         try:
             conn.execute("DROP DATABASE %s_test_schema_2" % ident)
-        except:
+        except Exception:
             pass
         try:
             conn.execute("DROP DATABASE %s" % ident)
-        except:
+        except Exception:
             pass
+
+
+@_create_db.for_db("oracle")
+def _oracle_create_db(cfg, eng, ident):
+    # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or
+    # similar, so that the default tablespace is not "system"; reflection will
+    # fail otherwise
+    with eng.connect() as conn:
+        conn.execute("create user %s identified by xe" % ident)
+        conn.execute("create user %s_ts1 identified by xe" % ident)
+        conn.execute("create user %s_ts2 identified by xe" % ident)
+        conn.execute("grant dba to %s" % (ident, ))
+        conn.execute("grant unlimited tablespace to %s" % ident)
+        conn.execute("grant unlimited tablespace to %s_ts1" % ident)
+        conn.execute("grant unlimited tablespace to %s_ts2" % ident)
+
+@_configure_follower.for_db("oracle")
+def _oracle_configure_follower(config, ident):
+    config.test_schema = "%s_ts1" % ident
+    config.test_schema_2 = "%s_ts2" % ident
+
+
+def _ora_drop_ignore(conn, dbname):
+    try:
+        conn.execute("drop user %s cascade" % dbname)
+        log.info("Reaped db: %s" % dbname)
+        return True
+    except exc.DatabaseError as err:
+        log.warn("couldn't drop db: %s" % err)
+        return False
+
+
+@_drop_db.for_db("oracle")
+def _oracle_drop_db(cfg, eng, ident):
+    with eng.connect() as conn:
+        # cx_Oracle seems to occasionally leak open connections when a large
+        # suite it run, even if we confirm we have zero references to
+        # connection objects.
+        # while there is a "kill session" command in Oracle,
+        # it unfortunately does not release the connection sufficiently.
+        _ora_drop_ignore(conn, ident)
+        _ora_drop_ignore(conn, "%s_ts1" % ident)
+        _ora_drop_ignore(conn, "%s_ts2" % ident)
+
+
+def reap_oracle_dbs(eng):
+    log.info("Reaping Oracle dbs...")
+    with eng.connect() as conn:
+        to_reap = conn.execute(
+            "select u.username from all_users u where username "
+            "like 'TEST_%' and not exists (select username "
+            "from v$session where username=u.username)")
+        all_names = set([username.lower() for (username, ) in to_reap])
+        to_drop = set()
+        for name in all_names:
+            if name.endswith("_ts1") or name.endswith("_ts2"):
+                continue
+            else:
+                to_drop.add(name)
+                if "%s_ts1" % name in all_names:
+                    to_drop.add("%s_ts1" % name)
+                if "%s_ts2" % name in all_names:
+                    to_drop.add("%s_ts2" % name)
+
+        dropped = total = 0
+        for total, username in enumerate(to_drop, 1):
+            if _ora_drop_ignore(conn, username):
+                dropped += 1
+        log.info(
+            "Dropped %d out of %d stale databases detected", dropped, total)
+
+
+@_follower_url_from_main.for_db("oracle")
+def _oracle_follower_url_from_main(url, ident):
+    url = sa_url.make_url(url)
+    url.username = ident
+    url.password = 'xe'
+    return url
+
+
diff --git a/reap_oracle_dbs.py b/reap_oracle_dbs.py
new file mode 100644 (file)
index 0000000..ff638a0
--- /dev/null
@@ -0,0 +1,24 @@
+"""Drop Oracle databases that are left over from a 
+multiprocessing test run.
+
+Currently the cx_Oracle driver seems to sometimes not release a
+TCP connection even if close() is called, which prevents the provisioning
+system from dropping a database in-process.
+
+"""
+from sqlalchemy.testing.plugin import plugin_base
+from sqlalchemy.testing import engines
+from sqlalchemy.testing import provision
+import logging
+
+logging.basicConfig()
+logging.getLogger(provision.__name__).setLevel(logging.INFO)
+
+plugin_base.read_config()
+oracle = plugin_base.file_config.get('db', 'oracle')
+from sqlalchemy.testing import provision
+
+engine = engines.testing_engine(oracle, {})
+provision.reap_oracle_dbs(engine)
+
+
diff --git a/tox.ini b/tox.ini
index c8226e91e69724e7a86a4b98902a0c0726ce84ac..15593d219db64af5b4f1cf8d226c473331e0f052 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -44,10 +44,7 @@ passenv=ORACLE_HOME NLS_LANG
 
 commands=
   {env:BASECOMMAND} {env:WORKERS} {env:SQLITE:} {env:POSTGRESQL:} {env:MYSQL:} {env:ORACLE:} {env:MSSQL:} {env:BACKENDONLY:} {env:COVERAGE:} {posargs}
-
-# note: oracle will need newer provisioning logic for oracle to work
-#
-#  {oracle}: python reap_oracle_dbs.py
+  {oracle}: python reap_oracle_dbs.py
 
 
 [testenv:pep8]