]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Refactor test provisioning to dialect-level files
authorGord Thompson <gord@gordthompson.com>
Mon, 13 Jan 2020 01:08:22 +0000 (20:08 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 26 Jan 2020 17:52:36 +0000 (12:52 -0500)
Fixes: #5085
<!-- Provide a general summary of your proposed changes in the Title field above -->

Move dialect-specific provisioning code to dialect-level copies of provision.py.

<!-- go over following points. check them with an `x` if they do apply, (they turn into clickable checkboxes once the PR is submitted, so no need to do everything at once)

-->

This pull request is:

- [ ] A documentation / typographical error fix
- Good to go, no issue or tests are needed
- [x] A short code fix
- please include the issue number, and create an issue if none exists, which
  must include a complete example of the issue.  one line code fixes without an
  issue and demonstration will not be accepted.
- Please include: `Fixes: #<issue number>` in the commit message
- please include tests.   one line code fixes without tests will not be accepted.
- [ ] A new feature implementation
- please include the issue number, and create an issue if none exists, which must
  include a complete example of how the feature would look.
- Please include: `Fixes: #<issue number>` in the commit message
- please include tests.

**Have a nice day!**

Closes: #5092
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/5092
Pull-request-sha: 25b9b7a9800549fb823576af8674e8d33ff4b2c1

Change-Id: Ie0b4a69aa472a60bdbd825e04c8595382bcc98e1
(cherry picked from commit 6fb7b527426b564302d6fd8ca11dfa7b78cc2e7a)

12 files changed:
lib/sqlalchemy/dialects/mssql/__init__.py
lib/sqlalchemy/dialects/mssql/provision.py [new file with mode: 0644]
lib/sqlalchemy/dialects/mysql/__init__.py
lib/sqlalchemy/dialects/mysql/provision.py [new file with mode: 0644]
lib/sqlalchemy/dialects/oracle/__init__.py
lib/sqlalchemy/dialects/oracle/provision.py [new file with mode: 0644]
lib/sqlalchemy/dialects/postgresql/__init__.py
lib/sqlalchemy/dialects/postgresql/provision.py [new file with mode: 0644]
lib/sqlalchemy/dialects/sqlite/__init__.py
lib/sqlalchemy/dialects/sqlite/provision.py [new file with mode: 0644]
lib/sqlalchemy/testing/provision.py
lib/sqlalchemy/testing/suite/test_reflection.py

index d8d577a658463a5fd336b557c7e3addf35557abd..0de9a74547969615054991436cf7fa67efe0d71e 100644 (file)
@@ -8,6 +8,7 @@
 from . import adodbapi  # noqa
 from . import base  # noqa
 from . import mxodbc  # noqa
+from . import provision  # noqa
 from . import pymssql  # noqa
 from . import pyodbc  # noqa
 from . import zxjdbc  # noqa
diff --git a/lib/sqlalchemy/dialects/mssql/provision.py b/lib/sqlalchemy/dialects/mssql/provision.py
new file mode 100644 (file)
index 0000000..558ad7a
--- /dev/null
@@ -0,0 +1,84 @@
+import logging
+
+from ... import create_engine
+from ... import exc
+from ...testing.provision import create_db
+from ...testing.provision import drop_db
+from ...testing.provision import run_reap_dbs
+from ...testing.provision import update_db_opts
+
+
+log = logging.getLogger(__name__)
+
+
+@update_db_opts.for_db("mssql")
+def _mssql_update_db_opts(db_url, db_opts):
+    db_opts["legacy_schema_aliasing"] = False
+
+
+@create_db.for_db("mssql")
+def _mssql_create_db(cfg, eng, ident):
+    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
+        conn.execute("create database %s" % ident)
+        conn.execute(
+            "ALTER DATABASE %s SET ALLOW_SNAPSHOT_ISOLATION ON" % ident
+        )
+        conn.execute(
+            "ALTER DATABASE %s SET READ_COMMITTED_SNAPSHOT ON" % ident
+        )
+        conn.execute("use %s" % ident)
+        conn.execute("create schema test_schema")
+        conn.execute("create schema test_schema_2")
+
+
+@drop_db.for_db("mssql")
+def _mssql_drop_db(cfg, eng, ident):
+    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
+        _mssql_drop_ignore(conn, ident)
+
+
+def _mssql_drop_ignore(conn, ident):
+    try:
+        # typically when this happens, we can't KILL the session anyway,
+        # so let the cleanup process drop the DBs
+        # for row in conn.execute(
+        #     "select session_id from sys.dm_exec_sessions "
+        #        "where database_id=db_id('%s')" % ident):
+        #    log.info("killing SQL server sesssion %s", row['session_id'])
+        #    conn.execute("kill %s" % row['session_id'])
+
+        conn.execute("drop database %s" % ident)
+        log.info("Reaped db: %s", ident)
+        return True
+    except exc.DatabaseError as err:
+        log.warning("couldn't drop db: %s", err)
+        return False
+
+
+@run_reap_dbs.for_db("mssql")
+def _reap_mssql_dbs(url, idents):
+    log.info("db reaper connecting to %r", url)
+    eng = create_engine(url)
+    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
+
+        log.info("identifiers in file: %s", ", ".join(idents))
+
+        to_reap = conn.execute(
+            "select d.name from sys.databases as d where name "
+            "like 'TEST_%' and not exists (select session_id "
+            "from sys.dm_exec_sessions "
+            "where database_id=d.database_id)"
+        )
+        all_names = {dbname.lower() for (dbname,) in to_reap}
+        to_drop = set()
+        for name in all_names:
+            if name in idents:
+                to_drop.add(name)
+
+        dropped = total = 0
+        for total, dbname in enumerate(to_drop, 1):
+            if _mssql_drop_ignore(conn, dbname):
+                dropped += 1
+        log.info(
+            "Dropped %d out of %d stale databases detected", dropped, total
+        )
index c9f3b43b47907f4fb3e129bd391b0272d97723b0..59bcab4b87668c6a56f1b53b57c38c5db272a447 100644 (file)
@@ -11,6 +11,7 @@ from . import gaerdbms  # noqa
 from . import mysqlconnector  # noqa
 from . import mysqldb  # noqa
 from . import oursql  # noqa
+from . import provision  # noqa
 from . import pymysql  # noqa
 from . import pyodbc  # noqa
 from . import zxjdbc  # noqa
diff --git a/lib/sqlalchemy/dialects/mysql/provision.py b/lib/sqlalchemy/dialects/mysql/provision.py
new file mode 100644 (file)
index 0000000..843b35c
--- /dev/null
@@ -0,0 +1,40 @@
+from ...testing.provision import configure_follower
+from ...testing.provision import create_db
+from ...testing.provision import drop_db
+from ...testing.provision import temp_table_keyword_args
+
+
+@create_db.for_db("mysql")
+def _mysql_create_db(cfg, eng, ident):
+    with eng.connect() as conn:
+        try:
+            _mysql_drop_db(cfg, conn, ident)
+        except Exception:
+            pass
+
+        conn.execute("CREATE DATABASE %s CHARACTER SET utf8mb4" % ident)
+        conn.execute(
+            "CREATE DATABASE %s_test_schema CHARACTER SET utf8mb4" % ident
+        )
+        conn.execute(
+            "CREATE DATABASE %s_test_schema_2 CHARACTER SET utf8mb4" % ident
+        )
+
+
+@configure_follower.for_db("mysql")
+def _mysql_configure_follower(config, ident):
+    config.test_schema = "%s_test_schema" % ident
+    config.test_schema_2 = "%s_test_schema_2" % ident
+
+
+@drop_db.for_db("mysql")
+def _mysql_drop_db(cfg, eng, ident):
+    with eng.connect() as conn:
+        conn.execute("DROP DATABASE %s_test_schema" % ident)
+        conn.execute("DROP DATABASE %s_test_schema_2" % ident)
+        conn.execute("DROP DATABASE %s" % ident)
+
+
+@temp_table_keyword_args.for_db("mysql")
+def _mysql_temp_table_keyword_args(cfg, eng):
+    return {"prefixes": ["TEMPORARY"]}
index 849dd6924a6591192f32dc89b16636b7cce0627e..19b8fac557407a5904db1de5c0a3dbd9047e49e1 100644 (file)
@@ -7,6 +7,7 @@
 
 from . import base  # noqa
 from . import cx_oracle  # noqa
+from . import provision  # noqa
 from . import zxjdbc  # noqa
 from .base import BFILE
 from .base import BINARY_DOUBLE
diff --git a/lib/sqlalchemy/dialects/oracle/provision.py b/lib/sqlalchemy/dialects/oracle/provision.py
new file mode 100644 (file)
index 0000000..8fc2ac2
--- /dev/null
@@ -0,0 +1,114 @@
+import logging
+
+from ... import create_engine
+from ... import exc
+from ...engine import url as sa_url
+from ...testing.provision import configure_follower
+from ...testing.provision import create_db
+from ...testing.provision import drop_db
+from ...testing.provision import follower_url_from_main
+from ...testing.provision import run_reap_dbs
+from ...testing.provision import temp_table_keyword_args
+from ...testing.provision import update_db_opts
+
+
+log = logging.getLogger(__name__)
+
+
+@create_db.for_db("oracle")
+def _oracle_create_db(cfg, eng, ident):
+    # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or
+    # similar, so that the default tablespace is not "system"; reflection will
+    # fail otherwise
+    with eng.connect() as conn:
+        conn.execute("create user %s identified by xe" % ident)
+        conn.execute("create user %s_ts1 identified by xe" % ident)
+        conn.execute("create user %s_ts2 identified by xe" % ident)
+        conn.execute("grant dba to %s" % (ident,))
+        conn.execute("grant unlimited tablespace to %s" % ident)
+        conn.execute("grant unlimited tablespace to %s_ts1" % ident)
+        conn.execute("grant unlimited tablespace to %s_ts2" % ident)
+
+
+@configure_follower.for_db("oracle")
+def _oracle_configure_follower(config, ident):
+    config.test_schema = "%s_ts1" % ident
+    config.test_schema_2 = "%s_ts2" % ident
+
+
+def _ora_drop_ignore(conn, dbname):
+    try:
+        conn.execute("drop user %s cascade" % dbname)
+        log.info("Reaped db: %s", dbname)
+        return True
+    except exc.DatabaseError as err:
+        log.warning("couldn't drop db: %s", err)
+        return False
+
+
+@drop_db.for_db("oracle")
+def _oracle_drop_db(cfg, eng, ident):
+    with eng.connect() as conn:
+        # cx_Oracle seems to occasionally leak open connections when a large
+        # suite it run, even if we confirm we have zero references to
+        # connection objects.
+        # while there is a "kill session" command in Oracle,
+        # it unfortunately does not release the connection sufficiently.
+        _ora_drop_ignore(conn, ident)
+        _ora_drop_ignore(conn, "%s_ts1" % ident)
+        _ora_drop_ignore(conn, "%s_ts2" % ident)
+
+
+@update_db_opts.for_db("oracle")
+def _oracle_update_db_opts(db_url, db_opts):
+    pass
+
+
+@run_reap_dbs.for_db("oracle")
+def _reap_oracle_dbs(url, idents):
+    log.info("db reaper connecting to %r", url)
+    eng = create_engine(url)
+    with eng.connect() as conn:
+
+        log.info("identifiers in file: %s", ", ".join(idents))
+
+        to_reap = conn.execute(
+            "select u.username from all_users u where username "
+            "like 'TEST_%' and not exists (select username "
+            "from v$session where username=u.username)"
+        )
+        all_names = {username.lower() for (username,) in to_reap}
+        to_drop = set()
+        for name in all_names:
+            if name.endswith("_ts1") or name.endswith("_ts2"):
+                continue
+            elif name in idents:
+                to_drop.add(name)
+                if "%s_ts1" % name in all_names:
+                    to_drop.add("%s_ts1" % name)
+                if "%s_ts2" % name in all_names:
+                    to_drop.add("%s_ts2" % name)
+
+        dropped = total = 0
+        for total, username in enumerate(to_drop, 1):
+            if _ora_drop_ignore(conn, username):
+                dropped += 1
+        log.info(
+            "Dropped %d out of %d stale databases detected", dropped, total
+        )
+
+
+@follower_url_from_main.for_db("oracle")
+def _oracle_follower_url_from_main(url, ident):
+    url = sa_url.make_url(url)
+    url.username = ident
+    url.password = "xe"
+    return url
+
+
+@temp_table_keyword_args.for_db("oracle")
+def _oracle_temp_table_keyword_args(cfg, eng):
+    return {
+        "prefixes": ["GLOBAL TEMPORARY"],
+        "oracle_on_commit": "PRESERVE ROWS",
+    }
index 80875916a0dca6c966c3fda9bd93b297eda041c4..bd3f58530ab48c37e618d0c8e808054170b5f81d 100644 (file)
@@ -7,6 +7,7 @@
 
 from . import base
 from . import pg8000  # noqa
+from . import provision  # noqa
 from . import psycopg2  # noqa
 from . import psycopg2cffi  # noqa
 from . import pygresql  # noqa
diff --git a/lib/sqlalchemy/dialects/postgresql/provision.py b/lib/sqlalchemy/dialects/postgresql/provision.py
new file mode 100644 (file)
index 0000000..404da93
--- /dev/null
@@ -0,0 +1,67 @@
+import logging
+import time
+
+from ... import exc
+from ... import text
+from ...testing.provision import create_db
+from ...testing.provision import drop_db
+from ...testing.provision import temp_table_keyword_args
+
+
+log = logging.getLogger(__name__)
+
+
+@create_db.for_db("postgresql")
+def _pg_create_db(cfg, eng, ident):
+    template_db = cfg.options.postgresql_templatedb
+
+    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
+        try:
+            _pg_drop_db(cfg, conn, ident)
+        except Exception:
+            pass
+        if not template_db:
+            template_db = conn.scalar("select current_database()")
+
+        attempt = 0
+        while True:
+            try:
+                conn.execute(
+                    "CREATE DATABASE %s TEMPLATE %s" % (ident, template_db)
+                )
+            except exc.OperationalError as err:
+                attempt += 1
+                if attempt >= 3:
+                    raise
+                if "accessed by other users" in str(err):
+                    log.info(
+                        "Waiting to create %s, URI %r, "
+                        "template DB %s is in use sleeping for .5",
+                        ident,
+                        eng.url,
+                        template_db,
+                    )
+                    time.sleep(0.5)
+            except:
+                raise
+            else:
+                break
+
+
+@drop_db.for_db("postgresql")
+def _pg_drop_db(cfg, eng, ident):
+    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
+        conn.execute(
+            text(
+                "select pg_terminate_backend(pid) from pg_stat_activity "
+                "where usename=current_user and pid != pg_backend_pid() "
+                "and datname=:dname"
+            ),
+            dname=ident,
+        )
+        conn.execute("DROP DATABASE %s" % ident)
+
+
+@temp_table_keyword_args.for_db("postgresql")
+def _postgresql_temp_table_keyword_args(cfg, eng):
+    return {"prefixes": ["TEMPORARY"]}
index 142131f631bddeed30951f5caca9a58d763414ff..c35cb9251cca1aa406a7eefefd03a0981195250e 100644 (file)
@@ -6,6 +6,7 @@
 # the MIT License: http://www.opensource.org/licenses/mit-license.php
 
 from . import base  # noqa
+from . import provision  # noqa
 from . import pysqlcipher  # noqa
 from . import pysqlite  # noqa
 from .base import BLOB
diff --git a/lib/sqlalchemy/dialects/sqlite/provision.py b/lib/sqlalchemy/dialects/sqlite/provision.py
new file mode 100644 (file)
index 0000000..d4a5ae9
--- /dev/null
@@ -0,0 +1,54 @@
+import os
+
+from ...engine import url as sa_url
+from ...testing.provision import create_db
+from ...testing.provision import drop_db
+from ...testing.provision import follower_url_from_main
+from ...testing.provision import post_configure_engine
+from ...testing.provision import temp_table_keyword_args
+
+
+@follower_url_from_main.for_db("sqlite")
+def _sqlite_follower_url_from_main(url, ident):
+    url = sa_url.make_url(url)
+    if not url.database or url.database == ":memory:":
+        return url
+    else:
+        return sa_url.make_url("sqlite:///%s.db" % ident)
+
+
+@post_configure_engine.for_db("sqlite")
+def _sqlite_post_configure_engine(url, engine, follower_ident):
+    from sqlalchemy import event
+
+    @event.listens_for(engine, "connect")
+    def connect(dbapi_connection, connection_record):
+        # use file DBs in all cases, memory acts kind of strangely
+        # as an attached
+        if not follower_ident:
+            dbapi_connection.execute(
+                'ATTACH DATABASE "test_schema.db" AS test_schema'
+            )
+        else:
+            dbapi_connection.execute(
+                'ATTACH DATABASE "%s_test_schema.db" AS test_schema'
+                % follower_ident
+            )
+
+
+@create_db.for_db("sqlite")
+def _sqlite_create_db(cfg, eng, ident):
+    pass
+
+
+@drop_db.for_db("sqlite")
+def _sqlite_drop_db(cfg, eng, ident):
+    if ident:
+        os.remove("%s_test_schema.db" % ident)
+    else:
+        os.remove("%s.db" % ident)
+
+
+@temp_table_keyword_args.for_db("sqlite")
+def _sqlite_temp_table_keyword_args(cfg, eng):
+    return {"prefixes": ["TEMPORARY"]}
index 70ace05110e851aead79fc97b90309dabb972877..6e2e1ccf50f76933492e1da3a6c7c771724154be 100644 (file)
@@ -1,13 +1,8 @@
 import collections
 import logging
-import os
-import time
 
 from . import config
 from . import engines
-from .. import create_engine
-from .. import exc
-from .. import text
 from ..engine import url as sa_url
 from ..util import compat
 
@@ -49,33 +44,32 @@ class register(object):
 def create_follower_db(follower_ident):
     for cfg in _configs_for_db_operation():
         log.info("CREATE database %s, URI %r", follower_ident, cfg.db.url)
-        _create_db(cfg, cfg.db, follower_ident)
-
-
-def configure_follower(follower_ident):
-    for cfg in config.Config.all_configs():
-        _configure_follower(cfg, follower_ident)
+        create_db(cfg, cfg.db, follower_ident)
 
 
 def setup_config(db_url, options, file_config, follower_ident):
+    # load the dialect, which should also have it set up its provision
+    # hooks
+
+    sa_url.make_url(db_url).get_dialect()
     if follower_ident:
-        db_url = _follower_url_from_main(db_url, follower_ident)
+        db_url = follower_url_from_main(db_url, follower_ident)
     db_opts = {}
-    _update_db_opts(db_url, db_opts)
+    update_db_opts(db_url, db_opts)
     eng = engines.testing_engine(db_url, db_opts)
-    _post_configure_engine(db_url, eng, follower_ident)
+    post_configure_engine(db_url, eng, follower_ident)
     eng.connect().close()
 
     cfg = config.Config.register(eng, db_opts, options, file_config)
     if follower_ident:
-        _configure_follower(cfg, follower_ident)
+        configure_follower(cfg, follower_ident)
     return cfg
 
 
 def drop_follower_db(follower_ident):
     for cfg in _configs_for_db_operation():
         log.info("DROP database %s, URI %r", follower_ident, cfg.db.url)
-        _drop_db(cfg, cfg.db, follower_ident)
+        drop_db(cfg, cfg.db, follower_ident)
 
 
 def _configs_for_db_operation():
@@ -98,211 +92,66 @@ def _configs_for_db_operation():
 
 
 @register.init
-def _create_db(cfg, eng, ident):
+def create_db(cfg, eng, ident):
+    """Dynamically create a database for testing.
+
+    Used when a test run will employ multiple processes, e.g., when run
+    via `tox` or `py.test -n4`.
+    """
     raise NotImplementedError("no DB creation routine for cfg: %s" % eng.url)
 
 
 @register.init
-def _drop_db(cfg, eng, ident):
+def drop_db(cfg, eng, ident):
+    """Drop a database that we dynamically created for testing."""
     raise NotImplementedError("no DB drop routine for cfg: %s" % eng.url)
 
 
 @register.init
-def _update_db_opts(db_url, db_opts):
+def update_db_opts(db_url, db_opts):
+    """Set database options (db_opts) for a test database that we created.
+    """
     pass
 
 
 @register.init
-def _configure_follower(cfg, ident):
-    pass
-
+def post_configure_engine(url, engine, follower_ident):
+    """Perform extra steps after configuring an engine for testing.
 
-@register.init
-def _post_configure_engine(url, engine, follower_ident):
+    (For the internal dialects, currently only used by sqlite.)
+    """
     pass
 
 
 @register.init
-def _follower_url_from_main(url, ident):
+def follower_url_from_main(url, ident):
+    """Create a connection URL for a dynamically-created test database.
+
+    :param url: the connection URL specified when the test run was invoked
+    :param ident: the pytest-xdist "worker identifier" to be used as the
+                  database name
+    """
     url = sa_url.make_url(url)
     url.database = ident
     return url
 
 
-@_update_db_opts.for_db("mssql")
-def _mssql_update_db_opts(db_url, db_opts):
-    db_opts["legacy_schema_aliasing"] = False
-
-
-@_follower_url_from_main.for_db("sqlite")
-def _sqlite_follower_url_from_main(url, ident):
-    url = sa_url.make_url(url)
-    if not url.database or url.database == ":memory:":
-        return url
-    else:
-        return sa_url.make_url("sqlite:///%s.db" % ident)
-
-
-@_post_configure_engine.for_db("sqlite")
-def _sqlite_post_configure_engine(url, engine, follower_ident):
-    from sqlalchemy import event
-
-    @event.listens_for(engine, "connect")
-    def connect(dbapi_connection, connection_record):
-        # use file DBs in all cases, memory acts kind of strangely
-        # as an attached
-        if not follower_ident:
-            dbapi_connection.execute(
-                'ATTACH DATABASE "test_schema.db" AS test_schema'
-            )
-        else:
-            dbapi_connection.execute(
-                'ATTACH DATABASE "%s_test_schema.db" AS test_schema'
-                % follower_ident
-            )
-
-
-@_create_db.for_db("postgresql")
-def _pg_create_db(cfg, eng, ident):
-    template_db = cfg.options.postgresql_templatedb
-
-    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
-        try:
-            _pg_drop_db(cfg, conn, ident)
-        except Exception:
-            pass
-        if not template_db:
-            template_db = conn.scalar("select current_database()")
-
-        attempt = 0
-        while True:
-            try:
-                conn.execute(
-                    "CREATE DATABASE %s TEMPLATE %s" % (ident, template_db)
-                )
-            except exc.OperationalError as err:
-                attempt += 1
-                if attempt >= 3:
-                    raise
-                if "accessed by other users" in str(err):
-                    log.info(
-                        "Waiting to create %s, URI %r, "
-                        "template DB %s is in use sleeping for .5",
-                        ident,
-                        eng.url,
-                        template_db,
-                    )
-                    time.sleep(0.5)
-            except:
-                raise
-            else:
-                break
-
-
-@_create_db.for_db("mysql")
-def _mysql_create_db(cfg, eng, ident):
-    with eng.connect() as conn:
-        try:
-            _mysql_drop_db(cfg, conn, ident)
-        except Exception:
-            pass
-
-        conn.execute("CREATE DATABASE %s CHARACTER SET utf8mb4" % ident)
-        conn.execute(
-            "CREATE DATABASE %s_test_schema CHARACTER SET utf8mb4" % ident
-        )
-        conn.execute(
-            "CREATE DATABASE %s_test_schema_2 CHARACTER SET utf8mb4" % ident
-        )
-
-
-@_configure_follower.for_db("mysql")
-def _mysql_configure_follower(config, ident):
-    config.test_schema = "%s_test_schema" % ident
-    config.test_schema_2 = "%s_test_schema_2" % ident
-
-
-@_create_db.for_db("sqlite")
-def _sqlite_create_db(cfg, eng, ident):
+@register.init
+def configure_follower(cfg, ident):
+    """Create dialect-specific config settings for a follower database."""
     pass
 
 
-@_drop_db.for_db("postgresql")
-def _pg_drop_db(cfg, eng, ident):
-    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
-        conn.execute(
-            text(
-                "select pg_terminate_backend(pid) from pg_stat_activity "
-                "where usename=current_user and pid != pg_backend_pid() "
-                "and datname=:dname"
-            ),
-            dname=ident,
-        )
-        conn.execute("DROP DATABASE %s" % ident)
-
-
-@_drop_db.for_db("sqlite")
-def _sqlite_drop_db(cfg, eng, ident):
-    if ident:
-        os.remove("%s_test_schema.db" % ident)
-    else:
-        os.remove("%s.db" % ident)
-
-
-@_drop_db.for_db("mysql")
-def _mysql_drop_db(cfg, eng, ident):
-    with eng.connect() as conn:
-        conn.execute("DROP DATABASE %s_test_schema" % ident)
-        conn.execute("DROP DATABASE %s_test_schema_2" % ident)
-        conn.execute("DROP DATABASE %s" % ident)
-
-
-@_create_db.for_db("oracle")
-def _oracle_create_db(cfg, eng, ident):
-    # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or
-    # similar, so that the default tablespace is not "system"; reflection will
-    # fail otherwise
-    with eng.connect() as conn:
-        conn.execute("create user %s identified by xe" % ident)
-        conn.execute("create user %s_ts1 identified by xe" % ident)
-        conn.execute("create user %s_ts2 identified by xe" % ident)
-        conn.execute("grant dba to %s" % (ident,))
-        conn.execute("grant unlimited tablespace to %s" % ident)
-        conn.execute("grant unlimited tablespace to %s_ts1" % ident)
-        conn.execute("grant unlimited tablespace to %s_ts2" % ident)
-
-
-@_configure_follower.for_db("oracle")
-def _oracle_configure_follower(config, ident):
-    config.test_schema = "%s_ts1" % ident
-    config.test_schema_2 = "%s_ts2" % ident
-
-
-def _ora_drop_ignore(conn, dbname):
-    try:
-        conn.execute("drop user %s cascade" % dbname)
-        log.info("Reaped db: %s", dbname)
-        return True
-    except exc.DatabaseError as err:
-        log.warning("couldn't drop db: %s", err)
-        return False
-
-
-@_drop_db.for_db("oracle")
-def _oracle_drop_db(cfg, eng, ident):
-    with eng.connect() as conn:
-        # cx_Oracle seems to occasionally leak open connections when a large
-        # suite it run, even if we confirm we have zero references to
-        # connection objects.
-        # while there is a "kill session" command in Oracle,
-        # it unfortunately does not release the connection sufficiently.
-        _ora_drop_ignore(conn, ident)
-        _ora_drop_ignore(conn, "%s_ts1" % ident)
-        _ora_drop_ignore(conn, "%s_ts2" % ident)
-
-
-@_update_db_opts.for_db("oracle")
-def _oracle_update_db_opts(db_url, db_opts):
+@register.init
+def run_reap_dbs(url, ident):
+    """Remove databases that were created during the test process, after the
+    process has ended.
+
+    This is an optional step that is invoked for certain backends that do not
+    reliably release locks on the database as long as a process is still in
+    use. For the internal dialects, this is currently only necessary for
+    mssql and oracle.
+    """
     pass
 
 
@@ -322,118 +171,20 @@ def reap_dbs(idents_file):
             idents[url_key].add(db_name)
 
     for url_key in urls:
-        backend = url_key[0]
         url = list(urls[url_key])[0]
         ident = idents[url_key]
-        if backend == "oracle":
-            _reap_oracle_dbs(url, ident)
-        elif backend == "mssql":
-            _reap_mssql_dbs(url, ident)
-
-
-def _reap_oracle_dbs(url, idents):
-    log.info("db reaper connecting to %r", url)
-    eng = create_engine(url)
-    with eng.connect() as conn:
-
-        log.info("identifiers in file: %s", ", ".join(idents))
-
-        to_reap = conn.execute(
-            "select u.username from all_users u where username "
-            "like 'TEST_%' and not exists (select username "
-            "from v$session where username=u.username)"
-        )
-        all_names = {username.lower() for (username,) in to_reap}
-        to_drop = set()
-        for name in all_names:
-            if name.endswith("_ts1") or name.endswith("_ts2"):
-                continue
-            elif name in idents:
-                to_drop.add(name)
-                if "%s_ts1" % name in all_names:
-                    to_drop.add("%s_ts1" % name)
-                if "%s_ts2" % name in all_names:
-                    to_drop.add("%s_ts2" % name)
-
-        dropped = total = 0
-        for total, username in enumerate(to_drop, 1):
-            if _ora_drop_ignore(conn, username):
-                dropped += 1
-        log.info(
-            "Dropped %d out of %d stale databases detected", dropped, total
-        )
-
-
-@_follower_url_from_main.for_db("oracle")
-def _oracle_follower_url_from_main(url, ident):
-    url = sa_url.make_url(url)
-    url.username = ident
-    url.password = "xe"
-    return url
+        run_reap_dbs(url, ident)
 
 
-@_create_db.for_db("mssql")
-def _mssql_create_db(cfg, eng, ident):
-    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
-        conn.execute("create database %s" % ident)
-        conn.execute(
-            "ALTER DATABASE %s SET ALLOW_SNAPSHOT_ISOLATION ON" % ident
-        )
-        conn.execute(
-            "ALTER DATABASE %s SET READ_COMMITTED_SNAPSHOT ON" % ident
-        )
-        conn.execute("use %s" % ident)
-        conn.execute("create schema test_schema")
-        conn.execute("create schema test_schema_2")
-
-
-@_drop_db.for_db("mssql")
-def _mssql_drop_db(cfg, eng, ident):
-    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
-        _mssql_drop_ignore(conn, ident)
-
-
-def _mssql_drop_ignore(conn, ident):
-    try:
-        # typically when this happens, we can't KILL the session anyway,
-        # so let the cleanup process drop the DBs
-        # for row in conn.execute(
-        #     "select session_id from sys.dm_exec_sessions "
-        #        "where database_id=db_id('%s')" % ident):
-        #    log.info("killing SQL server sesssion %s", row['session_id'])
-        #    conn.execute("kill %s" % row['session_id'])
-
-        conn.execute("drop database %s" % ident)
-        log.info("Reaped db: %s", ident)
-        return True
-    except exc.DatabaseError as err:
-        log.warning("couldn't drop db: %s", err)
-        return False
-
-
-def _reap_mssql_dbs(url, idents):
-    log.info("db reaper connecting to %r", url)
-    eng = create_engine(url)
-    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
-
-        log.info("identifiers in file: %s", ", ".join(idents))
-
-        to_reap = conn.execute(
-            "select d.name from sys.databases as d where name "
-            "like 'TEST_%' and not exists (select session_id "
-            "from sys.dm_exec_sessions "
-            "where database_id=d.database_id)"
-        )
-        all_names = {dbname.lower() for (dbname,) in to_reap}
-        to_drop = set()
-        for name in all_names:
-            if name in idents:
-                to_drop.add(name)
-
-        dropped = total = 0
-        for total, dbname in enumerate(to_drop, 1):
-            if _mssql_drop_ignore(conn, dbname):
-                dropped += 1
-        log.info(
-            "Dropped %d out of %d stale databases detected", dropped, total
-        )
+@register.init
+def temp_table_keyword_args(cfg, eng):
+    """Specify keyword arguments for creating a temporary Table.
+
+    Dialect-specific implementations of this method will return the
+    kwargs that are passed to the Table method when creating a temporary
+    table for testing, e.g., in the define_temp_tables method of the
+    ComponentReflectionTest class in suite/test_reflection.py
+    """
+    raise NotImplementedError(
+        "no temp table keyword args routine for cfg: %s" % eng.url
+    )
index a5302cfd208533636d1cdcd761864c8424c4201f..f9f427ff9768c459dfacfaf1dc18452d907971d2 100644 (file)
@@ -9,6 +9,7 @@ from .. import eq_
 from .. import expect_warnings
 from .. import fixtures
 from .. import is_
+from ..provision import temp_table_keyword_args
 from ..schema import Column
 from ..schema import Table
 from ... import event
@@ -245,16 +246,7 @@ class ComponentReflectionTest(fixtures.TablesTest):
 
     @classmethod
     def define_temp_tables(cls, metadata):
-        # cheat a bit, we should fix this with some dialect-level
-        # temp table fixture
-        if testing.against("oracle"):
-            kw = {
-                "prefixes": ["GLOBAL TEMPORARY"],
-                "oracle_on_commit": "PRESERVE ROWS",
-            }
-        else:
-            kw = {"prefixes": ["TEMPORARY"]}
-
+        kw = temp_table_keyword_args(config, config.db)
         user_tmp = Table(
             "user_tmp",
             metadata,