+from sqlalchemy.testing import config # noqa
+from sqlalchemy.testing import engines # noqa
+from sqlalchemy.testing import exclusions # noqa
+from sqlalchemy.testing import mock # noqa
+from sqlalchemy.testing import provide_metadata # noqa
+from sqlalchemy.testing.config import requirements as requires # noqa
+
from alembic import util # noqa
from .assertions import assert_raises # noqa
from .assertions import assert_raises_message # noqa
from .assertions import is_ # noqa
from .assertions import is_not_ # noqa
from .assertions import ne_ # noqa
-from .config import requirements as requires # noqa
from .fixtures import TestBase # noqa
-from .util import provide_metadata # noqa
from __future__ import absolute_import
-import contextlib
import re
-import warnings
-from sqlalchemy import exc as sa_exc
from sqlalchemy.engine import default
+from sqlalchemy.testing.assertions import _expect_warnings
from sqlalchemy.testing.assertions import assert_raises # noqa
from sqlalchemy.testing.assertions import assert_raises_message # noqa
from sqlalchemy.testing.assertions import eq_ # noqa
from sqlalchemy.testing.assertions import ne_ # noqa
from sqlalchemy.util import decorator
-from . import config
-from . import mock
-from .exclusions import db_spec
-from .. import util
from ..util.compat import py3k
-from ..util.compat import text_type
def eq_ignore_whitespace(a, b, msg=None):
+ # sqlalchemy.testing.assertion has this function
+ # but not with the special "!U" detection part
+
a = re.sub(r"^\s+?|\n", "", a)
a = re.sub(r" {2,}", " ", a)
b = re.sub(r"^\s+?|\n", "", b)
assert a == b, msg or "%r != %r" % (a, b)
-def assert_compiled(element, assert_string, dialect=None):
- dialect = _get_dialect(dialect)
- eq_(
- text_type(element.compile(dialect=dialect))
- .replace("\n", "")
- .replace("\t", ""),
- assert_string.replace("\n", "").replace("\t", ""),
- )
-
-
_dialect_mods = {}
Note that the test suite sets SAWarning warnings to raise exceptions.
"""
- return _expect_warnings(sa_exc.SAWarning, messages, **kw)
-
-
-@contextlib.contextmanager
-def expect_warnings_on(db, *messages, **kw):
- """Context manager which expects one or more warnings on specific
- dialects.
-
- The expect version **asserts** that the warnings were in fact seen.
-
- """
- spec = db_spec(db)
-
- if isinstance(db, util.string_types) and not spec(config._current):
- yield
- else:
- with expect_warnings(*messages, **kw):
- yield
-
-
-def emits_warning(*messages):
- """Decorator form of expect_warnings().
-
- Note that emits_warning does **not** assert that the warnings
- were in fact seen.
-
- """
-
- @decorator
- def decorate(fn, *args, **kw):
- with expect_warnings(assert_=False, *messages):
- return fn(*args, **kw)
-
- return decorate
+ return _expect_warnings(Warning, messages, **kw)
def emits_python_deprecation_warning(*messages):
return fn(*args, **kw)
return decorate
-
-
-def emits_warning_on(db, *messages):
- """Mark a test as emitting a warning on a specific dialect.
-
- With no arguments, squelches all SAWarning failures. Or pass one or more
- strings; these will be matched to the root of the warning description by
- warnings.filterwarnings().
-
- Note that emits_warning_on does **not** assert that the warnings
- were in fact seen.
-
- """
-
- @decorator
- def decorate(fn, *args, **kw):
- with expect_warnings_on(db, *messages):
- return fn(*args, **kw)
-
- return decorate
-
-
-@contextlib.contextmanager
-def _expect_warnings(exc_cls, messages, regex=True, assert_=True):
-
- if regex:
- filters = [re.compile(msg, re.I) for msg in messages]
- else:
- filters = messages
-
- seen = set(filters)
-
- real_warn = warnings.warn
-
- def our_warn(msg, exception=None, *arg, **kw):
- if exception and not issubclass(exception, exc_cls):
- return real_warn(msg, exception, *arg, **kw)
-
- if not filters:
- return
-
- for filter_ in filters:
- if (regex and filter_.match(msg)) or (
- not regex and filter_ == msg
- ):
- seen.discard(filter_)
- break
- else:
- if exception is None:
- real_warn(msg, *arg, **kw)
- else:
- real_warn(msg, exception, *arg, **kw)
-
- with mock.patch("warnings.warn", our_warn):
- yield
-
- if assert_:
- assert not seen, "Warnings were not seen: %s" % ", ".join(
- "%r" % (s.pattern if regex else s) for s in seen
- )
+++ /dev/null
-def get_url_driver_name(url):
- if "+" not in url.drivername:
- return url.get_dialect().driver
- else:
- return url.drivername.split("+")[1]
-
-
-def get_url_backend_name(url):
- if "+" not in url.drivername:
- return url.drivername
- else:
- return url.drivername.split("+")[0]
+++ /dev/null
-# testing/config.py
-# Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""NOTE: copied/adapted from SQLAlchemy master for backwards compatibility;
- this should be removable when Alembic targets SQLAlchemy 1.0.0
-"""
-
-import collections
-
-requirements = None
-db = None
-db_url = None
-db_opts = None
-file_config = None
-test_schema = None
-test_schema_2 = None
-_current = None
-
-
-class Config(object):
- def __init__(self, db, db_opts, options, file_config):
- self._set_name(db)
- self.db = db
- self.db_opts = db_opts
- self.options = options
- self.file_config = file_config
- self.test_schema = "test_schema"
- self.test_schema_2 = "test_schema_2"
-
- _stack = collections.deque()
- _configs = set()
-
- def _set_name(self, db):
- if db.dialect.server_version_info:
- svi = ".".join(str(tok) for tok in db.dialect.server_version_info)
- self.name = "%s+%s_[%s]" % (db.name, db.driver, svi)
- else:
- self.name = "%s+%s" % (db.name, db.driver)
-
- @classmethod
- def register(cls, db, db_opts, options, file_config):
- """add a config as one of the global configs.
-
- If there are no configs set up yet, this config also
- gets set as the "_current".
- """
- cfg = Config(db, db_opts, options, file_config)
- cls._configs.add(cfg)
- return cfg
-
- @classmethod
- def set_as_current(cls, config):
- global db, _current, db_url, test_schema, test_schema_2, db_opts
- _current = config
- db_url = config.db.url
- db_opts = config.db_opts
- test_schema = config.test_schema
- test_schema_2 = config.test_schema_2
- db = config.db
-
- @classmethod
- def push_engine(cls, db):
- assert _current, "Can't push without a default Config set up"
- cls.push(
- Config(
- db, _current.db_opts, _current.options, _current.file_config
- )
- )
-
- @classmethod
- def push(cls, config):
- cls._stack.append(_current)
- cls.set_as_current(config)
-
- @classmethod
- def reset(cls):
- if cls._stack:
- cls.set_as_current(cls._stack[0])
- cls._stack.clear()
-
- @classmethod
- def all_configs(cls):
- return cls._configs
-
- @classmethod
- def all_dbs(cls):
- for cfg in cls.all_configs():
- yield cfg.db
+++ /dev/null
-# testing/engines.py
-# Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""NOTE: copied/adapted from SQLAlchemy master for backwards compatibility;
- this should be removable when Alembic targets SQLAlchemy 1.0.0.
-"""
-
-from __future__ import absolute_import
-
-from . import config
-
-
-def testing_engine(url=None, options=None):
- """Produce an engine configured by --options with optional overrides."""
-
- from sqlalchemy import create_engine
-
- url = url or config.db.url
- if options is None:
- options = config.db_opts
-
- engine = create_engine(url, **options)
-
- return engine
import shutil
import textwrap
-from . import engines
-from . import provision
+from sqlalchemy.testing import engines
+from sqlalchemy.testing import provision
+
from .. import util
from ..script import Script
from ..script import ScriptDirectory
+++ /dev/null
-# testing/exclusions.py
-# Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""NOTE: copied/adapted from SQLAlchemy master for backwards compatibility;
- this should be removable when Alembic targets SQLAlchemy 1.0.0
-"""
-
-
-import contextlib
-import operator
-
-from sqlalchemy import util
-from sqlalchemy.util import decorator
-
-from . import config
-from .compat import get_url_backend_name
-from .compat import get_url_driver_name
-from .plugin.plugin_base import SkipTest
-from ..util import compat
-
-
-def skip_if(predicate, reason=None):
- rule = compound()
- pred = _as_predicate(predicate, reason)
- rule.skips.add(pred)
- return rule
-
-
-def fails_if(predicate, reason=None):
- rule = compound()
- pred = _as_predicate(predicate, reason)
- rule.fails.add(pred)
- return rule
-
-
-class compound(object):
- def __init__(self):
- self.fails = set()
- self.skips = set()
- self.tags = set()
-
- def __add__(self, other):
- return self.add(other)
-
- def add(self, *others):
- copy = compound()
- copy.fails.update(self.fails)
- copy.skips.update(self.skips)
- copy.tags.update(self.tags)
- for other in others:
- copy.fails.update(other.fails)
- copy.skips.update(other.skips)
- copy.tags.update(other.tags)
- return copy
-
- def not_(self):
- copy = compound()
- copy.fails.update(NotPredicate(fail) for fail in self.fails)
- copy.skips.update(NotPredicate(skip) for skip in self.skips)
- copy.tags.update(self.tags)
- return copy
-
- @property
- def enabled(self):
- return self.enabled_for_config(config._current)
-
- def enabled_for_config(self, config):
- for predicate in self.skips.union(self.fails):
- if predicate(config):
- return False
- else:
- return True
-
- def matching_config_reasons(self, config):
- return [
- predicate._as_string(config)
- for predicate in self.skips.union(self.fails)
- if predicate(config)
- ]
-
- def include_test(self, include_tags, exclude_tags):
- return bool(
- not self.tags.intersection(exclude_tags)
- and (not include_tags or self.tags.intersection(include_tags))
- )
-
- def _extend(self, other):
- self.skips.update(other.skips)
- self.fails.update(other.fails)
- self.tags.update(other.tags)
-
- def __call__(self, fn):
- if hasattr(fn, "_sa_exclusion_extend"):
- fn._sa_exclusion_extend._extend(self)
- return fn
-
- @decorator
- def decorate(fn, *args, **kw):
- return self._do(config._current, fn, *args, **kw)
-
- decorated = decorate(fn)
- decorated._sa_exclusion_extend = self
- return decorated
-
- @contextlib.contextmanager
- def fail_if(self):
- all_fails = compound()
- all_fails.fails.update(self.skips.union(self.fails))
-
- try:
- yield
- except Exception as ex:
- all_fails._expect_failure(config._current, ex)
- else:
- all_fails._expect_success(config._current)
-
- def _do(self, config, fn, *args, **kw):
- for skip in self.skips:
- if skip(config):
- msg = "'%s' : %s" % (fn.__name__, skip._as_string(config))
- raise SkipTest(msg)
-
- try:
- return_value = fn(*args, **kw)
- except Exception as ex:
- self._expect_failure(config, ex, name=fn.__name__)
- else:
- self._expect_success(config, name=fn.__name__)
- return return_value
-
- def _expect_failure(self, config, ex, name="block"):
- for fail in self.fails:
- if fail(config):
- print(
- (
- "%s failed as expected (%s): %s "
- % (name, fail._as_string(config), str(ex))
- )
- )
- break
- else:
- compat.raise_from_cause(ex)
-
- def _expect_success(self, config, name="block"):
- if not self.fails:
- return
- for fail in self.fails:
- if not fail(config):
- break
- else:
- raise AssertionError(
- "Unexpected success for '%s' (%s)"
- % (
- name,
- " and ".join(
- fail._as_string(config) for fail in self.fails
- ),
- )
- )
-
-
-def requires_tag(tagname):
- return tags([tagname])
-
-
-def tags(tagnames):
- comp = compound()
- comp.tags.update(tagnames)
- return comp
-
-
-def only_if(predicate, reason=None):
- predicate = _as_predicate(predicate)
- return skip_if(NotPredicate(predicate), reason)
-
-
-def succeeds_if(predicate, reason=None):
- predicate = _as_predicate(predicate)
- return fails_if(NotPredicate(predicate), reason)
-
-
-class Predicate(object):
- @classmethod
- def as_predicate(cls, predicate, description=None):
- if isinstance(predicate, compound):
- return cls.as_predicate(predicate.fails.union(predicate.skips))
-
- elif isinstance(predicate, Predicate):
- if description and predicate.description is None:
- predicate.description = description
- return predicate
- elif isinstance(predicate, (list, set)):
- return OrPredicate(
- [cls.as_predicate(pred) for pred in predicate], description
- )
- elif isinstance(predicate, tuple):
- return SpecPredicate(*predicate)
- elif isinstance(predicate, compat.string_types):
- tokens = predicate.split(" ", 2)
- op = spec = None
- db = tokens.pop(0)
- if tokens:
- op = tokens.pop(0)
- if tokens:
- spec = tuple(int(d) for d in tokens.pop(0).split("."))
- return SpecPredicate(db, op, spec, description=description)
- elif util.callable(predicate):
- return LambdaPredicate(predicate, description)
- else:
- assert False, "unknown predicate type: %s" % predicate
-
- def _format_description(self, config, negate=False):
- bool_ = self(config)
- if negate:
- bool_ = not negate
- return self.description % {
- "driver": get_url_driver_name(config.db.url),
- "database": get_url_backend_name(config.db.url),
- "doesnt_support": "doesn't support" if bool_ else "does support",
- "does_support": "does support" if bool_ else "doesn't support",
- }
-
- def _as_string(self, config=None, negate=False):
- raise NotImplementedError()
-
-
-class BooleanPredicate(Predicate):
- def __init__(self, value, description=None):
- self.value = value
- self.description = description or "boolean %s" % value
-
- def __call__(self, config):
- return self.value
-
- def _as_string(self, config, negate=False):
- return self._format_description(config, negate=negate)
-
-
-class SpecPredicate(Predicate):
- def __init__(self, db, op=None, spec=None, description=None):
- self.db = db
- self.op = op
- self.spec = spec
- self.description = description
-
- _ops = {
- "<": operator.lt,
- ">": operator.gt,
- "==": operator.eq,
- "!=": operator.ne,
- "<=": operator.le,
- ">=": operator.ge,
- "in": operator.contains,
- "between": lambda val, pair: val >= pair[0] and val <= pair[1],
- }
-
- def __call__(self, config):
- engine = config.db
-
- if "+" in self.db:
- dialect, driver = self.db.split("+")
- else:
- dialect, driver = self.db, None
-
- if dialect and engine.name != dialect:
- return False
- if driver is not None and engine.driver != driver:
- return False
-
- if self.op is not None:
- assert driver is None, "DBAPI version specs not supported yet"
-
- version = _server_version(engine)
- oper = (
- hasattr(self.op, "__call__") and self.op or self._ops[self.op]
- )
- return oper(version, self.spec)
- else:
- return True
-
- def _as_string(self, config, negate=False):
- if self.description is not None:
- return self._format_description(config)
- elif self.op is None:
- if negate:
- return "not %s" % self.db
- else:
- return "%s" % self.db
- else:
- if negate:
- return "not %s %s %s" % (self.db, self.op, self.spec)
- else:
- return "%s %s %s" % (self.db, self.op, self.spec)
-
-
-class LambdaPredicate(Predicate):
- def __init__(self, lambda_, description=None, args=None, kw=None):
- spec = compat.inspect_getargspec(lambda_)
- if not spec[0]:
- self.lambda_ = lambda db: lambda_()
- else:
- self.lambda_ = lambda_
- self.args = args or ()
- self.kw = kw or {}
- if description:
- self.description = description
- elif lambda_.__doc__:
- self.description = lambda_.__doc__
- else:
- self.description = "custom function"
-
- def __call__(self, config):
- return self.lambda_(config)
-
- def _as_string(self, config, negate=False):
- return self._format_description(config)
-
-
-class NotPredicate(Predicate):
- def __init__(self, predicate, description=None):
- self.predicate = predicate
- self.description = description
-
- def __call__(self, config):
- return not self.predicate(config)
-
- def _as_string(self, config, negate=False):
- if self.description:
- return self._format_description(config, not negate)
- else:
- return self.predicate._as_string(config, not negate)
-
-
-class OrPredicate(Predicate):
- def __init__(self, predicates, description=None):
- self.predicates = predicates
- self.description = description
-
- def __call__(self, config):
- for pred in self.predicates:
- if pred(config):
- return True
- return False
-
- def _eval_str(self, config, negate=False):
- if negate:
- conjunction = " and "
- else:
- conjunction = " or "
- return conjunction.join(
- p._as_string(config, negate=negate) for p in self.predicates
- )
-
- def _negation_str(self, config):
- if self.description is not None:
- return "Not " + self._format_description(config)
- else:
- return self._eval_str(config, negate=True)
-
- def _as_string(self, config, negate=False):
- if negate:
- return self._negation_str(config)
- else:
- if self.description is not None:
- return self._format_description(config)
- else:
- return self._eval_str(config)
-
-
-_as_predicate = Predicate.as_predicate
-
-
-def _is_excluded(db, op, spec):
- return SpecPredicate(db, op, spec)(config._current)
-
-
-def _server_version(engine):
- """Return a server_version_info tuple."""
-
- # force metadata to be retrieved
- conn = engine.connect()
- version = getattr(engine.dialect, "server_version_info", ())
- conn.close()
- return version
-
-
-def db_spec(*dbs):
- return OrPredicate([Predicate.as_predicate(db) for db in dbs])
-
-
-def open(): # noqa
- return skip_if(BooleanPredicate(False, "mark as execute"))
-
-
-def closed():
- return skip_if(BooleanPredicate(True, "marked as skip"))
-
-
-def fails(msg=None):
- return fails_if(BooleanPredicate(True, msg or "expected to fail"))
-
-
-@decorator
-def future(fn, *arg):
- return fails_if(LambdaPredicate(fn), "Future feature")
-
-
-def fails_on(db, reason=None):
- return fails_if(SpecPredicate(db), reason)
-
-
-def fails_on_everything_except(*dbs):
- return succeeds_if(OrPredicate([Predicate.as_predicate(db) for db in dbs]))
-
-
-def skip(db, reason=None):
- return skip_if(SpecPredicate(db), reason)
-
-
-def only_on(dbs, reason=None):
- return only_if(
- OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)])
- )
-
-
-def exclude(db, op, spec, reason=None):
- return skip_if(SpecPredicate(db, op, spec), reason)
-
-
-def against(config, *queries):
- assert queries, "no queries sent!"
- return OrPredicate([Predicate.as_predicate(query) for query in queries])(
- config
- )
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import text
+from sqlalchemy.testing import config
+from sqlalchemy.testing import mock
+from sqlalchemy.testing.assertions import eq_
from sqlalchemy.testing.fixtures import TestBase # noqa
import alembic
-from . import config
-from . import mock
from .assertions import _get_dialect
-from .assertions import eq_
from ..environment import EnvironmentContext
from ..migration import MigrationContext
from ..operations import Operations
+++ /dev/null
-# testing/mock.py
-# Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-
-"""Import stub for mock library.
-
- .. note::
-
- copied/adapted from SQLAlchemy master for backwards compatibility;
- this should be removable when Alembic targets SQLAlchemy 1.0.0
-
-"""
-from __future__ import absolute_import
-
-from ..util.compat import py3k
-
-if py3k:
- from unittest.mock import MagicMock, Mock, call, patch, ANY
-else:
- try:
- from mock import MagicMock, Mock, call, patch, ANY # noqa
- except ImportError:
- raise ImportError(
- "SQLAlchemy's test suite requires the "
- "'mock' library as of 0.8.2."
- )
+++ /dev/null
-"""NOTE: copied/adapted from SQLAlchemy master for backwards compatibility;
- this should be removable when Alembic targets SQLAlchemy 1.0.0
-"""
+++ /dev/null
-"""
-Bootstrapper for test framework plugins.
-
-The entire rationale for this system is to get the modules in plugin/
-imported without importing all of the supporting library, so that we can
-set up things for testing before coverage starts.
-
-The rationale for all of plugin/ being *in* the supporting library in the
-first place is so that the testing and plugin suite is available to other
-libraries, mainly external SQLAlchemy and Alembic dialects, to make use
-of the same test environment and standard suites available to
-SQLAlchemy/Alembic themselves without the need to ship/install a separate
-package outside of SQLAlchemy.
-
-NOTE: copied/adapted from SQLAlchemy master for backwards compatibility;
-this should be removable when Alembic targets SQLAlchemy 1.0.0.
-
-"""
-
-import os
-import sys
-
-bootstrap_file = locals()["bootstrap_file"]
-to_bootstrap = locals()["to_bootstrap"]
-
-
-def load_file_as_module(name):
- path = os.path.join(os.path.dirname(bootstrap_file), "%s.py" % name)
- if sys.version_info.major >= 3:
- from importlib import machinery
-
- mod = machinery.SourceFileLoader(name, path).load_module()
- else:
- import imp
-
- mod = imp.load_source(name, path)
- return mod
-
-
-if to_bootstrap == "pytest":
- sys.modules["alembic_plugin_base"] = load_file_as_module("plugin_base")
- sys.modules["alembic_pytestplugin"] = load_file_as_module("pytestplugin")
-else:
- raise Exception("unknown bootstrap: %s" % to_bootstrap) # noqa
+++ /dev/null
-# plugin/plugin_base.py
-# Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""Testing extensions.
-
-this module is designed to work as a testing-framework-agnostic library,
-however it currenty targets only py.test.
-
-
-"""
-
-from __future__ import absolute_import
-
-import re
-import sys
-
-from pytest import skip
-
-SkipTest = skip.Exception
-
-py3k = sys.version_info.major >= 3
-
-if py3k:
- import configparser
-else:
- import ConfigParser as configparser
-
-# late imports
-fixtures = None
-engines = None
-provision = None
-exclusions = None
-warnings = None
-assertions = None
-requirements = None
-config = None
-util = None
-file_config = None
-
-
-logging = None
-include_tags = set()
-exclude_tags = set()
-options = None
-
-
-def setup_options(make_option):
- make_option(
- "--log-info",
- action="callback",
- type="string",
- callback=_log,
- help="turn on info logging for <LOG> (multiple OK)",
- )
- make_option(
- "--log-debug",
- action="callback",
- type="string",
- callback=_log,
- help="turn on debug logging for <LOG> (multiple OK)",
- )
- make_option(
- "--db",
- action="append",
- type="string",
- dest="db",
- help="Use prefab database uri. Multiple OK, "
- "first one is run by default.",
- )
- make_option(
- "--dbs",
- action="callback",
- zeroarg_callback=_list_dbs,
- help="List available prefab dbs",
- )
- make_option(
- "--dburi",
- action="append",
- type="string",
- dest="dburi",
- help="Database uri. Multiple OK, " "first one is run by default.",
- )
- make_option(
- "--dropfirst",
- action="store_true",
- dest="dropfirst",
- help="Drop all tables in the target database first",
- )
- make_option(
- "--backend-only",
- action="store_true",
- dest="backend_only",
- help="Run only tests marked with __backend__",
- )
- make_option(
- "--postgresql-templatedb",
- type="string",
- help="name of template database to use for Postgresql "
- "CREATE DATABASE (defaults to current database)",
- )
- make_option(
- "--low-connections",
- action="store_true",
- dest="low_connections",
- help="Use a low number of distinct connections - "
- "i.e. for Oracle TNS",
- )
- make_option(
- "--write-idents",
- type="string",
- dest="write_idents",
- help="write out generated follower idents to <file>, "
- "when -n<num> is used",
- )
- make_option(
- "--reversetop",
- action="store_true",
- dest="reversetop",
- default=False,
- help="Use a random-ordering set implementation in the ORM "
- "(helps reveal dependency issues)",
- )
- make_option(
- "--requirements",
- action="callback",
- type="string",
- callback=_requirements_opt,
- help="requirements class for testing, overrides setup.cfg",
- )
- make_option(
- "--with-cdecimal",
- action="store_true",
- dest="cdecimal",
- default=False,
- help="Monkeypatch the cdecimal library into Python 'decimal' "
- "for all tests",
- )
- make_option(
- "--include-tag",
- action="callback",
- callback=_include_tag,
- type="string",
- help="Include tests with tag <tag>",
- )
- make_option(
- "--exclude-tag",
- action="callback",
- callback=_exclude_tag,
- type="string",
- help="Exclude tests with tag <tag>",
- )
- make_option(
- "--mysql-engine",
- action="store",
- dest="mysql_engine",
- default=None,
- help="Use the specified MySQL storage engine for all tables, "
- "default is a db-default/InnoDB combo.",
- )
-
-
-def configure_follower(follower_ident):
- """Configure required state for a follower.
-
- This invokes in the parent process and typically includes
- database creation.
-
- """
- from alembic.testing import provision
-
- provision.FOLLOWER_IDENT = follower_ident
-
-
-def memoize_important_follower_config(dict_):
- """Store important configuration we will need to send to a follower.
-
- This invokes in the parent process after normal config is set up.
-
- This is necessary as py.test seems to not be using forking, so we
- start with nothing in memory, *but* it isn't running our argparse
- callables, so we have to just copy all of that over.
-
- """
- dict_["memoized_config"] = {
- "include_tags": include_tags,
- "exclude_tags": exclude_tags,
- }
-
-
-def restore_important_follower_config(dict_):
- """Restore important configuration needed by a follower.
-
- This invokes in the follower process.
-
- """
- include_tags.update(dict_["memoized_config"]["include_tags"])
- exclude_tags.update(dict_["memoized_config"]["exclude_tags"])
-
-
-def read_config():
- global file_config
- file_config = configparser.ConfigParser()
- file_config.read(["setup.cfg", "test.cfg"])
-
-
-def pre_begin(opt):
- """things to set up early, before coverage might be setup."""
- global options
- options = opt
- for fn in pre_configure:
- fn(options, file_config)
-
-
-def set_coverage_flag(value):
- options.has_coverage = value
-
-
-def post_begin():
- """things to set up later, once we know coverage is running."""
-
- # Lazy setup of other options (post coverage)
- for fn in post_configure:
- fn(options, file_config)
-
- global util, fixtures, engines, exclusions, assertions
- global warnings, profiling, config, testing
- from alembic.testing import config, warnings, exclusions # noqa
- from alembic.testing import engines, fixtures # noqa
- from sqlalchemy import util # noqa
-
- warnings.setup_filters()
-
-
-def _log(opt_str, value, parser):
- global logging
- if not logging:
- import logging
-
- logging.basicConfig()
-
- if opt_str.endswith("-info"):
- logging.getLogger(value).setLevel(logging.INFO)
- elif opt_str.endswith("-debug"):
- logging.getLogger(value).setLevel(logging.DEBUG)
-
-
-def _list_dbs(*args):
- print("Available --db options (use --dburi to override)")
- for macro in sorted(file_config.options("db")):
- print("%20s\t%s" % (macro, file_config.get("db", macro)))
- sys.exit(0)
-
-
-def _requirements_opt(opt_str, value, parser):
- _setup_requirements(value)
-
-
-def _exclude_tag(opt_str, value, parser):
- exclude_tags.add(value.replace("-", "_"))
-
-
-def _include_tag(opt_str, value, parser):
- include_tags.add(value.replace("-", "_"))
-
-
-pre_configure = []
-post_configure = []
-
-
-def pre(fn):
- pre_configure.append(fn)
- return fn
-
-
-def post(fn):
- post_configure.append(fn)
- return fn
-
-
-@pre
-def _setup_options(opt, file_config):
- global options
- options = opt
-
-
-@pre
-def _monkeypatch_cdecimal(options, file_config):
- if options.cdecimal:
- import cdecimal
-
- sys.modules["decimal"] = cdecimal
-
-
-@post
-def _engine_uri(options, file_config):
- from alembic.testing import config
- from alembic.testing import provision
-
- if options.dburi:
- db_urls = list(options.dburi)
- else:
- db_urls = []
-
- if options.db:
- for db_token in options.db:
- for db in re.split(r"[,\s]+", db_token):
- if db not in file_config.options("db"):
- raise RuntimeError(
- "Unknown URI specifier '%s'. "
- "Specify --dbs for known uris." % db
- )
- else:
- db_urls.append(file_config.get("db", db))
-
- if not db_urls:
- db_urls.append(file_config.get("db", "default"))
-
- for db_url in db_urls:
-
- if options.write_idents and provision.FOLLOWER_IDENT: # != 'master':
- with open(options.write_idents, "a") as file_:
- file_.write(provision.FOLLOWER_IDENT + " " + db_url + "\n")
-
- cfg = provision.setup_config(
- db_url, options, file_config, provision.FOLLOWER_IDENT
- )
-
- if not config._current:
- cfg.set_as_current(cfg)
-
-
-@post
-def _requirements(options, file_config):
-
- requirement_cls = file_config.get("sqla_testing", "requirement_cls")
- _setup_requirements(requirement_cls)
-
-
-def _setup_requirements(argument):
- from alembic.testing import config
-
- if config.requirements is not None:
- return
-
- modname, clsname = argument.split(":")
-
- # importlib.import_module() only introduced in 2.7, a little
- # late
- mod = __import__(modname)
- for component in modname.split(".")[1:]:
- mod = getattr(mod, component)
- req_cls = getattr(mod, clsname)
-
- config.requirements = req_cls()
-
-
-@post
-def _prep_testing_database(options, file_config):
- from alembic.testing import config
- from alembic.testing.exclusions import against
- from sqlalchemy import schema
- from sqlalchemy import inspect
-
- if options.dropfirst:
- for cfg in config.Config.all_configs():
- e = cfg.db
- inspector = inspect(e)
- try:
- view_names = inspector.get_view_names()
- except NotImplementedError:
- pass
- else:
- for vname in view_names:
- e.execute(
- schema._DropView(
- schema.Table(vname, schema.MetaData())
- )
- )
-
- if config.requirements.schemas.enabled_for_config(cfg):
- try:
- view_names = inspector.get_view_names(schema="test_schema")
- except NotImplementedError:
- pass
- else:
- for vname in view_names:
- e.execute(
- schema._DropView(
- schema.Table(
- vname,
- schema.MetaData(),
- schema="test_schema",
- )
- )
- )
-
- for tname in reversed(
- inspector.get_table_names(order_by="foreign_key")
- ):
- e.execute(
- schema.DropTable(schema.Table(tname, schema.MetaData()))
- )
-
- if config.requirements.schemas.enabled_for_config(cfg):
- for tname in reversed(
- inspector.get_table_names(
- order_by="foreign_key", schema="test_schema"
- )
- ):
- e.execute(
- schema.DropTable(
- schema.Table(
- tname, schema.MetaData(), schema="test_schema"
- )
- )
- )
-
- if against(cfg, "postgresql"):
- from sqlalchemy.dialects import postgresql
-
- for enum in inspector.get_enums("*"):
- e.execute(
- postgresql.DropEnumType(
- postgresql.ENUM(
- name=enum["name"], schema=enum["schema"]
- )
- )
- )
-
-
-@post
-def _reverse_topological(options, file_config):
- if options.reversetop:
- from sqlalchemy.orm.util import randomize_unitofwork
-
- randomize_unitofwork()
-
-
-@post
-def _post_setup_options(opt, file_config):
- from alembic.testing import config
-
- config.options = options
- config.file_config = file_config
-
-
-def want_class(cls):
- if not issubclass(cls, fixtures.TestBase):
- return False
- elif cls.__name__.startswith("_"):
- return False
- elif config.options.backend_only and not getattr(
- cls, "__backend__", False
- ):
- return False
- else:
- return True
-
-
-def want_method(cls, fn):
- if not fn.__name__.startswith("test_"):
- return False
- elif fn.__module__ is None:
- return False
- elif include_tags:
- return (
- hasattr(cls, "__tags__")
- and exclusions.tags(cls.__tags__).include_test(
- include_tags, exclude_tags
- )
- ) or (
- hasattr(fn, "_sa_exclusion_extend")
- and fn._sa_exclusion_extend.include_test(
- include_tags, exclude_tags
- )
- )
- elif exclude_tags and hasattr(cls, "__tags__"):
- return exclusions.tags(cls.__tags__).include_test(
- include_tags, exclude_tags
- )
- elif exclude_tags and hasattr(fn, "_sa_exclusion_extend"):
- return fn._sa_exclusion_extend.include_test(include_tags, exclude_tags)
- else:
- return True
-
-
-def generate_sub_tests(cls, module):
- if getattr(cls, "__backend__", False):
- for cfg in _possible_configs_for_cls(cls):
- orig_name = cls.__name__
-
- # we can have special chars in these names except for the
- # pytest junit plugin, which is tripped up by the brackets
- # and periods, so sanitize
-
- alpha_name = re.sub(r"[_\[\]\.]+", "_", cfg.name)
- alpha_name = re.sub("_+$", "", alpha_name)
- name = "%s_%s" % (cls.__name__, alpha_name)
-
- subcls = type(
- name,
- (cls,),
- {"_sa_orig_cls_name": orig_name, "__only_on_config__": cfg},
- )
- setattr(module, name, subcls)
- yield subcls
- else:
- yield cls
-
-
-def start_test_class(cls):
- _do_skips(cls)
- _setup_engine(cls)
-
-
-def stop_test_class(cls):
- # from sqlalchemy import inspect
- # assert not inspect(testing.db).get_table_names()
- _restore_engine()
-
-
-def _restore_engine():
- config._current.reset()
-
-
-def _setup_engine(cls):
- if getattr(cls, "__engine_options__", None):
- eng = engines.testing_engine(options=cls.__engine_options__)
- config._current.push_engine(eng)
-
-
-def before_test(test, test_module_name, test_class, test_name):
- pass
-
-
-def after_test(test):
- pass
-
-
-def _possible_configs_for_cls(cls, reasons=None):
- all_configs = set(config.Config.all_configs())
-
- if cls.__unsupported_on__:
- spec = exclusions.db_spec(*cls.__unsupported_on__)
- for config_obj in list(all_configs):
- if spec(config_obj):
- all_configs.remove(config_obj)
-
- if getattr(cls, "__only_on__", None):
- spec = exclusions.db_spec(*util.to_list(cls.__only_on__))
- for config_obj in list(all_configs):
- if not spec(config_obj):
- all_configs.remove(config_obj)
-
- if getattr(cls, "__only_on_config__", None):
- all_configs.intersection_update([cls.__only_on_config__])
-
- if hasattr(cls, "__requires__"):
- requirements = config.requirements
- for config_obj in list(all_configs):
- for requirement in cls.__requires__:
- check = getattr(requirements, requirement)
-
- skip_reasons = check.matching_config_reasons(config_obj)
- if skip_reasons:
- all_configs.remove(config_obj)
- if reasons is not None:
- reasons.extend(skip_reasons)
- break
-
- if hasattr(cls, "__prefer_requires__"):
- non_preferred = set()
- requirements = config.requirements
- for config_obj in list(all_configs):
- for requirement in cls.__prefer_requires__:
- check = getattr(requirements, requirement)
-
- if not check.enabled_for_config(config_obj):
- non_preferred.add(config_obj)
- if all_configs.difference(non_preferred):
- all_configs.difference_update(non_preferred)
-
- return all_configs
-
-
-def _do_skips(cls):
- reasons = []
- all_configs = _possible_configs_for_cls(cls, reasons)
-
- if getattr(cls, "__skip_if__", False):
- for c in getattr(cls, "__skip_if__"):
- if c():
- raise SkipTest(
- "'%s' skipped by %s" % (cls.__name__, c.__name__)
- )
-
- if not all_configs:
- msg = "'%s' unsupported on any DB implementation %s%s" % (
- cls.__name__,
- ", ".join(
- "'%s(%s)+%s'"
- % (
- config_obj.db.name,
- ".".join(
- str(dig)
- for dig in config_obj.db.dialect.server_version_info
- ),
- config_obj.db.driver,
- )
- for config_obj in config.Config.all_configs()
- ),
- ", ".join(reasons),
- )
- raise SkipTest(msg)
- elif hasattr(cls, "__prefer_backends__"):
- non_preferred = set()
- spec = exclusions.db_spec(*util.to_list(cls.__prefer_backends__))
- for config_obj in all_configs:
- if not spec(config_obj):
- non_preferred.add(config_obj)
- if all_configs.difference(non_preferred):
- all_configs.difference_update(non_preferred)
-
- if config._current not in all_configs:
- _setup_config(all_configs.pop(), cls)
-
-
-def _setup_config(config_obj, ctx):
- config._current.push(config_obj)
+++ /dev/null
-"""NOTE: copied/adapted from SQLAlchemy master for backwards compatibility;
- this should be removable when Alembic targets SQLAlchemy 1.0.0.
-"""
-
-try:
- # installed by bootstrap.py
- import alembic_plugin_base as plugin_base
-except ImportError:
- # assume we're a package, use traditional import
- from . import plugin_base
-
-import argparse
-import collections
-import inspect
-import os
-import sys
-
-import pytest
-
-py3k = sys.version_info.major >= 3
-
-try:
- import xdist # noqa
-
- has_xdist = True
-except ImportError:
- has_xdist = False
-
-
-def pytest_addoption(parser):
- group = parser.getgroup("sqlalchemy")
-
- def make_option(name, **kw):
- callback_ = kw.pop("callback", None)
- if callback_:
-
- class CallableAction(argparse.Action):
- def __call__(
- self, parser, namespace, values, option_string=None
- ):
- callback_(option_string, values, parser)
-
- kw["action"] = CallableAction
-
- zeroarg_callback = kw.pop("zeroarg_callback", None)
- if zeroarg_callback:
-
- class CallableAction(argparse.Action):
- def __init__(
- self,
- option_strings,
- dest,
- default=False,
- required=False,
- help=None, # noqa
- ):
- super(CallableAction, self).__init__(
- option_strings=option_strings,
- dest=dest,
- nargs=0,
- const=True,
- default=default,
- required=required,
- help=help,
- )
-
- def __call__(
- self, parser, namespace, values, option_string=None
- ):
- zeroarg_callback(option_string, values, parser)
-
- kw["action"] = CallableAction
-
- group.addoption(name, **kw)
-
- plugin_base.setup_options(make_option)
- plugin_base.read_config()
-
-
-def pytest_configure(config):
- if hasattr(config, "slaveinput"):
- plugin_base.restore_important_follower_config(config.slaveinput)
- plugin_base.configure_follower(config.slaveinput["follower_ident"])
- else:
- if config.option.write_idents and os.path.exists(
- config.option.write_idents
- ):
- os.remove(config.option.write_idents)
-
- plugin_base.pre_begin(config.option)
-
- plugin_base.set_coverage_flag(
- bool(getattr(config.option, "cov_source", False))
- )
-
-
-def pytest_sessionstart(session):
- plugin_base.post_begin()
-
-
-if has_xdist:
- import uuid
-
- def pytest_configure_node(node):
- # the master for each node fills slaveinput dictionary
- # which pytest-xdist will transfer to the subprocess
-
- plugin_base.memoize_important_follower_config(node.slaveinput)
-
- node.slaveinput["follower_ident"] = "test_%s" % uuid.uuid4().hex[0:12]
- from alembic.testing import provision
-
- provision.create_follower_db(node.slaveinput["follower_ident"])
-
- def pytest_testnodedown(node, error):
- from alembic.testing import provision
-
- provision.drop_follower_db(node.slaveinput["follower_ident"])
-
-
-def pytest_collection_modifyitems(session, config, items):
- # look for all those classes that specify __backend__ and
- # expand them out into per-database test cases.
-
- # this is much easier to do within pytest_pycollect_makeitem, however
- # pytest is iterating through cls.__dict__ as makeitem is
- # called which causes a "dictionary changed size" error on py3k.
- # I'd submit a pullreq for them to turn it into a list first, but
- # it's to suit the rather odd use case here which is that we are adding
- # new classes to a module on the fly.
-
- rebuilt_items = collections.defaultdict(list)
- items[:] = [
- item for item in items if isinstance(item.parent, pytest.Instance)
- ]
- test_classes = set(item.parent for item in items)
- for test_class in test_classes:
- for sub_cls in plugin_base.generate_sub_tests(
- test_class.cls, test_class.parent.module
- ):
- if sub_cls is not test_class.cls:
- list_ = rebuilt_items[test_class.cls]
-
- for inst in pytest.Class(
- sub_cls.__name__, parent=test_class.parent.parent
- ).collect():
- list_.extend(inst.collect())
-
- newitems = []
- for item in items:
- if item.parent.cls in rebuilt_items:
- newitems.extend(rebuilt_items[item.parent.cls])
- rebuilt_items[item.parent.cls][:] = []
- else:
- newitems.append(item)
-
- # seems like the functions attached to a test class aren't sorted already?
- # is that true and why's that? (when using unittest, they're sorted)
- items[:] = sorted(
- newitems,
- key=lambda item: (
- item.parent.parent.parent.name,
- item.parent.parent.name,
- item.name,
- ),
- )
-
-
-def pytest_pycollect_makeitem(collector, name, obj):
- if inspect.isclass(obj) and plugin_base.want_class(obj):
- return pytest.Class(name, parent=collector)
- elif (
- inspect.isfunction(obj)
- and isinstance(collector, pytest.Instance)
- and plugin_base.want_method(collector.cls, obj)
- ):
- return pytest.Function(name, parent=collector)
- else:
- return []
-
-
-_current_class = None
-
-
-def pytest_runtest_setup(item):
- # here we seem to get called only based on what we collected
- # in pytest_collection_modifyitems. So to do class-based stuff
- # we have to tear that out.
- global _current_class
-
- if not isinstance(item, pytest.Function):
- return
-
- # ... so we're doing a little dance here to figure it out...
- if _current_class is None:
- class_setup(item.parent.parent)
- _current_class = item.parent.parent
-
- # this is needed for the class-level, to ensure that the
- # teardown runs after the class is completed with its own
- # class-level teardown...
- def finalize():
- global _current_class
- class_teardown(item.parent.parent)
- _current_class = None
-
- item.parent.parent.addfinalizer(finalize)
-
- test_setup(item)
-
-
-def pytest_runtest_teardown(item):
- # ...but this works better as the hook here rather than
- # using a finalizer, as the finalizer seems to get in the way
- # of the test reporting failures correctly (you get a bunch of
- # py.test assertion stuff instead)
- test_teardown(item)
-
-
-def test_setup(item):
- plugin_base.before_test(
- item, item.parent.module.__name__, item.parent.cls, item.name
- )
-
-
-def test_teardown(item):
- plugin_base.after_test(item)
-
-
-def class_setup(item):
- plugin_base.start_test_class(item.cls)
-
-
-def class_teardown(item):
- plugin_base.stop_test_class(item.cls)
+++ /dev/null
-"""NOTE: copied/adapted from SQLAlchemy master for backwards compatibility;
- this should be removable when Alembic targets SQLAlchemy 1.0.0
-"""
-import collections
-import logging
-import os
-import time
-
-from sqlalchemy import create_engine
-from sqlalchemy import exc
-from sqlalchemy import text
-from sqlalchemy.engine import url as sa_url
-
-from . import config
-from . import engines
-from .compat import get_url_backend_name
-from ..util import compat
-
-log = logging.getLogger(__name__)
-
-FOLLOWER_IDENT = None
-
-
-class register(object):
- def __init__(self):
- self.fns = {}
-
- @classmethod
- def init(cls, fn):
- return register().for_db("*")(fn)
-
- def for_db(self, dbname):
- def decorate(fn):
- self.fns[dbname] = fn
- return self
-
- return decorate
-
- def __call__(self, cfg, *arg):
- if isinstance(cfg, compat.string_types):
- url = sa_url.make_url(cfg)
- elif isinstance(cfg, sa_url.URL):
- url = cfg
- else:
- url = cfg.db.url
- backend = get_url_backend_name(url)
- if backend in self.fns:
- return self.fns[backend](cfg, *arg)
- else:
- return self.fns["*"](cfg, *arg)
-
-
-def create_follower_db(follower_ident):
-
- for cfg in _configs_for_db_operation():
- _create_db(cfg, cfg.db, follower_ident)
-
-
-def configure_follower(follower_ident):
- for cfg in config.Config.all_configs():
- _configure_follower(cfg, follower_ident)
-
-
-def setup_config(db_url, options, file_config, follower_ident):
- if follower_ident:
- db_url = _follower_url_from_main(db_url, follower_ident)
- db_opts = {}
- _update_db_opts(db_url, db_opts)
- eng = engines.testing_engine(db_url, db_opts)
- _post_configure_engine(db_url, eng, follower_ident)
- eng.connect().close()
-
- cfg = config.Config.register(eng, db_opts, options, file_config)
- if follower_ident:
- _configure_follower(cfg, follower_ident)
- return cfg
-
-
-def drop_follower_db(follower_ident):
- for cfg in _configs_for_db_operation():
- _drop_db(cfg, cfg.db, follower_ident)
-
-
-def _configs_for_db_operation():
- hosts = set()
-
- for cfg in config.Config.all_configs():
- cfg.db.dispose()
-
- for cfg in config.Config.all_configs():
- url = cfg.db.url
- backend = get_url_backend_name(url)
- host_conf = (backend, url.username, url.host, url.database)
-
- if host_conf not in hosts:
- yield cfg
- hosts.add(host_conf)
-
- for cfg in config.Config.all_configs():
- cfg.db.dispose()
-
-
-@register.init
-def _create_db(cfg, eng, ident):
- raise NotImplementedError("no DB creation routine for cfg: %s" % eng.url)
-
-
-@register.init
-def _drop_db(cfg, eng, ident):
- raise NotImplementedError("no DB drop routine for cfg: %s" % eng.url)
-
-
-@register.init
-def _update_db_opts(db_url, db_opts):
- pass
-
-
-@register.init
-def _configure_follower(cfg, ident):
- pass
-
-
-@register.init
-def _post_configure_engine(url, engine, follower_ident):
- pass
-
-
-@register.init
-def _follower_url_from_main(url, ident):
- url = sa_url.make_url(url)
- url.database = ident
- return url
-
-
-@_update_db_opts.for_db("mssql")
-def _mssql_update_db_opts(db_url, db_opts):
- db_opts["legacy_schema_aliasing"] = False
-
-
-@_follower_url_from_main.for_db("sqlite")
-def _sqlite_follower_url_from_main(url, ident):
- url = sa_url.make_url(url)
- if not url.database or url.database == ":memory:":
- return url
- else:
- return sa_url.make_url("sqlite:///%s.db" % ident)
-
-
-@_post_configure_engine.for_db("sqlite")
-def _sqlite_post_configure_engine(url, engine, follower_ident):
- from sqlalchemy import event
-
- @event.listens_for(engine, "connect")
- def connect(dbapi_connection, connection_record):
- # use file DBs in all cases, memory acts kind of strangely
- # as an attached
- if not follower_ident:
- dbapi_connection.execute(
- 'ATTACH DATABASE "test_schema.db" AS test_schema'
- )
- else:
- dbapi_connection.execute(
- 'ATTACH DATABASE "%s_test_schema.db" AS test_schema'
- % follower_ident
- )
-
-
-@_create_db.for_db("postgresql")
-def _pg_create_db(cfg, eng, ident):
- template_db = cfg.options.postgresql_templatedb
-
- with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
- try:
- _pg_drop_db(cfg, conn, ident)
- except Exception:
- pass
- if not template_db:
- template_db = conn.scalar("select current_database()")
-
- attempt = 0
- while True:
- try:
- conn.execute(
- "CREATE DATABASE %s TEMPLATE %s" % (ident, template_db)
- )
- except exc.OperationalError as err:
- attempt += 1
- if attempt >= 3:
- raise
- if "accessed by other users" in str(err):
- log.info(
- "Waiting to create %s, URI %r, "
- "template DB %s is in use sleeping for .5",
- ident,
- eng.url,
- template_db,
- )
- time.sleep(0.5)
- else:
- break
-
-
-@_create_db.for_db("mysql")
-def _mysql_create_db(cfg, eng, ident):
- with eng.connect() as conn:
- try:
- _mysql_drop_db(cfg, conn, ident)
- except Exception:
- pass
- conn.execute("CREATE DATABASE %s" % ident)
- conn.execute("CREATE DATABASE %s_test_schema" % ident)
- conn.execute("CREATE DATABASE %s_test_schema_2" % ident)
-
-
-@_configure_follower.for_db("mysql")
-def _mysql_configure_follower(config, ident):
- config.test_schema = "%s_test_schema" % ident
- config.test_schema_2 = "%s_test_schema_2" % ident
-
-
-@_create_db.for_db("sqlite")
-def _sqlite_create_db(cfg, eng, ident):
- pass
-
-
-@_drop_db.for_db("postgresql")
-def _pg_drop_db(cfg, eng, ident):
- with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
- conn.execute(
- text(
- "select pg_terminate_backend(pid) from pg_stat_activity "
- "where usename=current_user and pid != pg_backend_pid() "
- "and datname=:dname"
- ),
- dname=ident,
- )
- conn.execute("DROP DATABASE %s" % ident)
-
-
-@_drop_db.for_db("sqlite")
-def _sqlite_drop_db(cfg, eng, ident):
- if ident:
- os.remove("%s_test_schema.db" % ident)
- else:
- os.remove("%s.db" % ident)
-
-
-@_drop_db.for_db("mysql")
-def _mysql_drop_db(cfg, eng, ident):
- with eng.connect() as conn:
- conn.execute("DROP DATABASE %s_test_schema" % ident)
- conn.execute("DROP DATABASE %s_test_schema_2" % ident)
- conn.execute("DROP DATABASE %s" % ident)
-
-
-@_create_db.for_db("oracle")
-def _oracle_create_db(cfg, eng, ident):
- # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or
- # similar, so that the default tablespace is not "system"; reflection will
- # fail otherwise
- with eng.connect() as conn:
- conn.execute("create user %s identified by xe" % ident)
- conn.execute("create user %s_ts1 identified by xe" % ident)
- conn.execute("create user %s_ts2 identified by xe" % ident)
- conn.execute("grant dba to %s" % (ident,))
- conn.execute("grant unlimited tablespace to %s" % ident)
- conn.execute("grant unlimited tablespace to %s_ts1" % ident)
- conn.execute("grant unlimited tablespace to %s_ts2" % ident)
-
-
-@_configure_follower.for_db("oracle")
-def _oracle_configure_follower(config, ident):
- config.test_schema = "%s_ts1" % ident
- config.test_schema_2 = "%s_ts2" % ident
-
-
-def _ora_drop_ignore(conn, dbname):
- try:
- conn.execute("drop user %s cascade" % dbname)
- log.info("Reaped db: %s" % dbname)
- return True
- except exc.DatabaseError as err:
- log.warning("couldn't drop db: %s" % err)
- return False
-
-
-@_drop_db.for_db("oracle")
-def _oracle_drop_db(cfg, eng, ident):
- with eng.connect() as conn:
- # cx_Oracle seems to occasionally leak open connections when a large
- # suite it run, even if we confirm we have zero references to
- # connection objects.
- # while there is a "kill session" command in Oracle,
- # it unfortunately does not release the connection sufficiently.
- _ora_drop_ignore(conn, ident)
- _ora_drop_ignore(conn, "%s_ts1" % ident)
- _ora_drop_ignore(conn, "%s_ts2" % ident)
-
-
-def reap_oracle_dbs(idents_file):
- log.info("Reaping Oracle dbs...")
-
- urls = collections.defaultdict(list)
- with open(idents_file) as file_:
- for line in file_:
- line = line.strip()
- db_name, db_url = line.split(" ")
- urls[db_url].append(db_name)
-
- for url in urls:
- if not url.startswith("oracle"):
- continue
- idents = urls[url]
- log.info("db reaper connecting to %r", url)
- eng = create_engine(url)
- with eng.connect() as conn:
-
- log.info("identifiers in file: %s", ", ".join(idents))
-
- to_reap = conn.execute(
- "select u.username from all_users u where username "
- "like 'TEST_%' and not exists (select username "
- "from v$session where username=u.username)"
- )
- all_names = set(username.lower() for (username,) in to_reap)
- to_drop = set()
- for name in all_names:
- if name.endswith("_ts1") or name.endswith("_ts2"):
- continue
- elif name in idents:
- to_drop.add(name)
- if "%s_ts1" % name in all_names:
- to_drop.add("%s_ts1" % name)
- if "%s_ts2" % name in all_names:
- to_drop.add("%s_ts2" % name)
-
- dropped = total = 0
- for total, username in enumerate(to_drop, 1):
- if _ora_drop_ignore(conn, username):
- dropped += 1
- log.info(
- "Dropped %d out of %d stale databases detected", dropped, total
- )
-
-
-@_follower_url_from_main.for_db("oracle")
-def _oracle_follower_url_from_main(url, ident):
- url = sa_url.make_url(url)
- url.username = ident
- url.password = "xe"
- return url
import sys
+from sqlalchemy.testing import exclusions
from sqlalchemy.testing.requirements import Requirements
from alembic import util
from alembic.util import sqla_compat
-from . import exclusions
class SuiteRequirements(Requirements):
+++ /dev/null
-from sqlalchemy.util import decorator
-
-
-@decorator
-def provide_metadata(fn, *args, **kw):
- """Provide bound MetaData for a single test, dropping afterwards."""
-
- from . import config
- from sqlalchemy import schema
-
- metadata = schema.MetaData(config.db)
- self = args[0]
- prev_meta = getattr(self, "metadata", None)
- self.metadata = metadata
- try:
- return fn(*args, **kw)
- finally:
- metadata.drop_all()
- self.metadata = prev_meta
+++ /dev/null
-# testing/warnings.py
-# Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""NOTE: copied/adapted from SQLAlchemy master for backwards compatibility;
- this should be removable when Alembic targets SQLAlchemy 0.9.4.
-"""
-
-from __future__ import absolute_import
-
-import re
-import warnings
-
-from sqlalchemy import exc as sa_exc
-
-
-def setup_filters():
- """Set global warning behavior for the test suite."""
- warnings.filterwarnings(
- "ignore", category=sa_exc.SAPendingDeprecationWarning
- )
- warnings.filterwarnings("error", category=sa_exc.SADeprecationWarning)
- warnings.filterwarnings("error", category=sa_exc.SAWarning)
- warnings.filterwarnings("error", category=DeprecationWarning)
-
- # temporary to allow SQLAlchemy 1.1 to pass under python 3
- warnings.filterwarnings(
- "ignore", category=DeprecationWarning, message=".*formatargspec"
- )
-
-
-def assert_warnings(fn, warning_msgs, regex=False):
- """Assert that each of the given warnings are emitted by fn."""
-
- from .assertions import eq_
-
- with warnings.catch_warnings(record=True) as log:
- # ensure that nothing is going into __warningregistry__
- warnings.filterwarnings("always")
-
- result = fn()
- for warning in log:
- popwarn = warning_msgs.pop(0)
- if regex:
- assert re.match(popwarn, str(warning.message))
- else:
- eq_(popwarn, str(warning.message))
- return result
def warn(msg, stacklevel=2):
- warnings.warn(msg, stacklevel=stacklevel)
+ warnings.warn(msg, UserWarning, stacklevel=stacklevel)
def msg(msg, newline=True):
--- /dev/null
+.. change::
+ :tags: change, internals
+
+ The test suite for Alembic now makes use of SQLAlchemy's testing framework
+ directly. Previously, Alembic had its own version of this framework that
+ was mostly copied from that of SQLAlchemy to enable testing with older
+ SQLAlchemy versions. The majority of this code is now removed so that both
+ projects can leverage improvements from a common testing framework.
--- /dev/null
+"""Drop Oracle, SQL Server databases that are left over from a
+multiprocessing test run.
+
+Currently the cx_Oracle driver seems to sometimes not release a
+TCP connection even if close() is called, which prevents the provisioning
+system from dropping a database in-process.
+
+For SQL Server, databases still remain in use after tests run and
+running a kill of all detected sessions does not seem to release the
+database in process.
+
+"""
+import logging
+import sys
+
+from sqlalchemy.testing import provision
+
+
+logging.basicConfig()
+logging.getLogger(provision.__name__).setLevel(logging.INFO)
+
+if hasattr(provision, "reap_dbs"):
+ provision.reap_dbs(sys.argv[1])
+else:
+ provision.reap_oracle_dbs(sys.argv[1])
+++ /dev/null
-"""Drop Oracle databases that are left over from a
-multiprocessing test run.
-
-Currently the cx_Oracle driver seems to sometimes not release a
-TCP connection even if close() is called, which prevents the provisioning
-system from dropping a database in-process.
-
-"""
-from alembic.testing import provision
-import logging
-import sys
-
-logging.basicConfig()
-logging.getLogger(provision.__name__).setLevel(logging.INFO)
-
-provision.reap_oracle_dbs(sys.argv[1])
-
-
"""
import os
-# use bootstrapping so that test plugins are loaded
-# without touching the main library before coverage starts
+import sqlalchemy
+
+# ideally, SQLAlchemy would allow us to just import bootstrap,
+# but for now we have to use its "load from a file" approach
+
bootstrap_file = os.path.join(
- os.path.dirname(__file__),
- "..",
- "alembic",
- "testing",
- "plugin",
- "bootstrap.py",
+ os.path.dirname(sqlalchemy.__file__), "testing", "plugin", "bootstrap.py"
)
with open(bootstrap_file) as f:
-from alembic.testing import exclusions
+from sqlalchemy.testing import exclusions
+
from alembic.testing.requirements import SuiteRequirements
from alembic.util import sqla_compat
from alembic.testing import eq_
from alembic.testing import is_
from alembic.testing import is_not_
+from alembic.testing import mock
from alembic.testing import TestBase
-from alembic.testing.mock import Mock
from alembic.util import CommandError
from ._autogen_fixtures import AutogenFixtureTest
from ._autogen_fixtures import AutogenTest
return m
def test_uses_custom_compare_type_function(self):
- my_compare_type = Mock()
+ my_compare_type = mock.Mock()
self.context._user_compare_type = my_compare_type
uo = ops.UpgradeOps(ops=[])
eq_(type(inspected_type), INTEGER)
def test_column_type_not_modified_custom_compare_type_returns_False(self):
- my_compare_type = Mock()
+ my_compare_type = mock.Mock()
my_compare_type.return_value = False
self.context._user_compare_type = my_compare_type
eq_(diffs, [])
def test_column_type_modified_custom_compare_type_returns_True(self):
- my_compare_type = Mock()
+ my_compare_type = mock.Mock()
my_compare_type.return_value = True
self.context._user_compare_type = my_compare_type
from alembic.testing import config
from alembic.testing import eq_
from alembic.testing import eq_ignore_whitespace
+from alembic.testing import mock
from alembic.testing import TestBase
from alembic.testing.fixtures import op_fixture
-from alembic.testing.mock import patch
from alembic.util import compat
py3k = sys.version_info >= (3,)
"SomeCustomConstraint(Column('id', Integer(), table=<t>))])",
)
- @patch("alembic.autogenerate.render.MAX_PYTHON_ARGS", 3)
+ @mock.patch("alembic.autogenerate.render.MAX_PYTHON_ARGS", 3)
def test_render_table_max_cols(self):
m = MetaData()
t = Table(
from alembic.script import ScriptDirectory
from alembic.testing import assert_raises_message
from alembic.testing import eq_
+from alembic.testing import mock
from alembic.testing.env import _no_sql_testing_config
from alembic.testing.env import _write_config_file
from alembic.testing.env import clear_staging_env
from alembic.testing.env import staging_env
from alembic.testing.fixtures import capture_db
from alembic.testing.fixtures import TestBase
-from alembic.testing.mock import call
-from alembic.testing.mock import Mock
from alembic.util import compat
)
def test_attributes_attr(self):
- m1 = Mock()
+ m1 = mock.Mock()
cfg = config.Config()
cfg.attributes["connection"] = m1
eq_(cfg.attributes["connection"], m1)
def test_attributes_construtor(self):
- m1 = Mock()
- m2 = Mock()
+ m1 = mock.Mock()
+ m2 = mock.Mock()
cfg = config.Config(attributes={"m1": m1})
cfg.attributes["connection"] = m2
eq_(cfg.attributes, {"m1": m1, "connection": m2})
class StdoutOutputEncodingTest(TestBase):
def test_plain(self):
- stdout = Mock(encoding="latin-1")
+ stdout = mock.Mock(encoding="latin-1")
cfg = config.Config(stdout=stdout)
cfg.print_stdout("test %s %s", "x", "y")
- eq_(stdout.mock_calls, [call.write("test x y"), call.write("\n")])
+ eq_(
+ stdout.mock_calls,
+ [mock.call.write("test x y"), mock.call.write("\n")],
+ )
def test_utf8_unicode(self):
- stdout = Mock(encoding="latin-1")
+ stdout = mock.Mock(encoding="latin-1")
cfg = config.Config(stdout=stdout)
cfg.print_stdout(compat.u("méil %s %s"), "x", "y")
eq_(
stdout.mock_calls,
- [call.write(compat.u("méil x y")), call.write("\n")],
+ [mock.call.write(compat.u("méil x y")), mock.call.write("\n")],
)
def test_ascii_unicode(self):
- stdout = Mock(encoding=None)
+ stdout = mock.Mock(encoding=None)
cfg = config.Config(stdout=stdout)
cfg.print_stdout(compat.u("méil %s %s"), "x", "y")
- eq_(stdout.mock_calls, [call.write("m?il x y"), call.write("\n")])
+ eq_(
+ stdout.mock_calls,
+ [mock.call.write("m?il x y"), mock.call.write("\n")],
+ )
def test_only_formats_output_with_args(self):
- stdout = Mock(encoding=None)
+ stdout = mock.Mock(encoding=None)
cfg = config.Config(stdout=stdout)
cfg.print_stdout(compat.u("test 3%"))
- eq_(stdout.mock_calls, [call.write("test 3%"), call.write("\n")])
+ eq_(
+ stdout.mock_calls,
+ [mock.call.write("test 3%"), mock.call.write("\n")],
+ )
class TemplateOutputEncodingTest(TestBase):
from alembic.testing import config
from alembic.testing import eq_
from alembic.testing import is_
+from alembic.testing import mock
from alembic.testing.assertions import expect_warnings
from alembic.testing.env import _no_sql_testing_config
from alembic.testing.env import _sqlite_file_db
from alembic.testing.env import write_script
from alembic.testing.fixtures import capture_context_buffer
from alembic.testing.fixtures import TestBase
-from alembic.testing.mock import call
-from alembic.testing.mock import MagicMock
-from alembic.testing.mock import Mock
class EnvironmentTest(TestBase):
def test_x_arg(self):
env = self._fixture()
- self.cfg.cmd_opts = Mock(x="y=5")
+ self.cfg.cmd_opts = mock.Mock(x="y=5")
eq_(env.get_x_argument(), "y=5")
def test_x_arg_asdict(self):
env = self._fixture()
- self.cfg.cmd_opts = Mock(x=["y=5"])
+ self.cfg.cmd_opts = mock.Mock(x=["y=5"])
eq_(env.get_x_argument(as_dictionary=True), {"y": "5"})
def test_x_arg_no_opts(self):
"""
% a_rev,
)
- migration_fn = MagicMock()
+ migration_fn = mock.MagicMock()
def upgrade(rev, context):
migration_fn(rev, context)
env.run_migrations()
- eq_(migration_fn.mock_calls, [call((), env._migration_context)])
+ eq_(migration_fn.mock_calls, [mock.call((), env._migration_context)])
@classmethod
def setup_class(cls):
cls.bind = config.db
- cls.conn = cls.bind.connect()
staging_env()
- cls.migration_context = MigrationContext.configure(
- connection=cls.conn,
- opts={"compare_type": True, "compare_server_default": True},
- )
def setUp(self):
+ self.conn = self.bind.connect()
+ self.migration_context = MigrationContext.configure(
+ connection=self.conn,
+ opts={"compare_type": True, "compare_server_default": True},
+ )
self.autogen_context = api.AutogenContext(self.migration_context)
+ def tearDown(self):
+ self.conn.close()
+
@classmethod
def teardown_class(cls):
- cls.conn.close()
clear_staging_env()
@provide_metadata
sqlite: SQLITE={env:TOX_SQLITE:--db sqlite}
postgresql: POSTGRESQL={env:TOX_POSTGRESQL:--db postgresql}
mysql: MYSQL={env:TOX_MYSQL:--db mysql}
- oracle: ORACLE={env:TOX_ORACLE:--db oracle} --low-connections --write-idents oracle_idents.txt
+ oracle: ORACLE={env:TOX_ORACLE:--db oracle} --low-connections --write-idents db_idents.txt
mssql: MSSQL={env:TOX_MSSQL:--db pymssql}
pyoptimize: PYTHONOPTIMIZE=1
pyoptimize: LIMITTESTS="tests/test_script_consumption.py"
commands=
{env:BASECOMMAND} {env:WORKERS} {env:SQLITE:} {env:POSTGRESQL:} {env:MYSQL:} {env:ORACLE:} {env:MSSQL:} {env:BACKENDONLY:} {env:COVERAGE:} {env:LIMITTESTS:} {posargs}
- {oracle}: python reap_oracle_dbs.py oracle_idents.txt
+ {oracle,mssql}: python reap_dbs.py db_idents.txt
# thanks to https://julien.danjou.info/the-best-flake8-extensions/