conn_col_default = conn_col.server_default
if conn_col_default is None and metadata_default is None:
return False
+
+ if sqla_compat.has_computed and isinstance(
+ metadata_default, sa_schema.Computed
+ ):
+ # return False in case of a computed column as the server
+ # default. Note that DDL for adding or removing "GENERATED AS" from
+ # an existing column is not currently known for any backend.
+ # Once SQLAlchemy can reflect "GENERATED" as the "computed" element,
+ # we would also want to ignore and/or warn for changes vs. the
+ # metadata (or support backend specific DDL if applicable).
+ return False
+
rendered_metadata_default = _render_server_default_for_compare(
metadata_default, metadata_col, autogen_context
)
if rendered is not False:
return rendered
+ args = []
opts = []
+
if column.server_default:
- rendered = _render_server_default(
- column.server_default, autogen_context
- )
- if rendered:
- opts.append(("server_default", rendered))
+ if sqla_compat._server_default_is_computed(column):
+ rendered = _render_computed(column.computed, autogen_context)
+ if rendered:
+ args.append(rendered)
+ else:
+ rendered = _render_server_default(
+ column.server_default, autogen_context
+ )
+ if rendered:
+ opts.append(("server_default", rendered))
if (
column.autoincrement is not None
opts.append(("comment", "%r" % comment))
# TODO: for non-ascii colname, assign a "key"
- return "%(prefix)sColumn(%(name)r, %(type)s, %(kwargs)s)" % {
+ return "%(prefix)sColumn(%(name)r, %(type)s, %(args)s%(kwargs)s)" % {
"prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
"name": _ident(column.name),
"type": _repr_type(column.type, autogen_context),
+ "args": ", ".join([str(arg) for arg in args]) + ", " if args else "",
"kwargs": (
", ".join(
["%s=%s" % (kwname, val) for kwname, val in opts]
if rendered is not False:
return rendered
- if isinstance(default, sa_schema.DefaultClause):
+ if sqla_compat.has_computed and isinstance(default, sa_schema.Computed):
+ return _render_computed(default, autogen_context)
+ elif isinstance(default, sa_schema.DefaultClause):
if isinstance(default.arg, compat.string_types):
default = default.arg
else:
return default
+def _render_computed(computed, autogen_context):
+ text = _render_potential_expr(
+ computed.sqltext, autogen_context, wrap_in_text=False
+ )
+
+ kwargs = {}
+ if computed.persisted is not None:
+ kwargs["persisted"] = computed.persisted
+ return "%(prefix)sComputed(%(text)s, %(kwargs)s)" % {
+ "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+ "text": text,
+ "kwargs": (", ".join("%s=%s" % pair for pair in kwargs.items())),
+ }
+
+
def _repr_type(type_, autogen_context):
rendered = _user_defined_render("type", type_, autogen_context)
if rendered is not False:
import functools
+from sqlalchemy import exc
from sqlalchemy import Integer
from sqlalchemy import types as sqltypes
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.schema import DDLElement
from sqlalchemy.sql.elements import quoted_name
+from ..util import sqla_compat
from ..util.sqla_compat import _columns_for_constraint # noqa
from ..util.sqla_compat import _find_columns # noqa
from ..util.sqla_compat import _fk_spec # noqa
@compiles(ColumnDefault)
def visit_column_default(element, compiler, **kw):
+ if sqla_compat.has_computed and (
+ isinstance(element.default, sqla_compat.Computed)
+ or isinstance(element.existing_server_default, sqla_compat.Computed)
+ ):
+ raise exc.CompileError(
+ 'Adding or removing a "computed" construct, e.g. GENERATED '
+ "ALWAYS AS, to or from an existing column is not supported."
+ )
+
return "%s %s %s" % (
alter_table(compiler, element.table_name, element.schema),
alter_column(compiler, element.column_name),
from sqlalchemy.testing import config # noqa
+from sqlalchemy.testing import emits_warning # noqa
from sqlalchemy.testing import engines # noqa
-from sqlalchemy.testing import exclusions # noqa
from sqlalchemy.testing import mock # noqa
from sqlalchemy.testing import provide_metadata # noqa
from sqlalchemy.testing.config import requirements as requires # noqa
from alembic import util # noqa
+from . import exclusions # noqa
from .assertions import assert_raises # noqa
from .assertions import assert_raises_message # noqa
from .assertions import emits_python_deprecation_warning # noqa
from .fixture_functions import combinations # noqa
from .fixture_functions import fixture # noqa
from .fixtures import TestBase # noqa
+from .util import resolve_lambda # noqa
--- /dev/null
+# testing/exclusions.py
+# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+
+import contextlib
+import operator
+import re
+
+from sqlalchemy import util as sqla_util
+from sqlalchemy.util import decorator
+
+from . import config
+from . import fixture_functions
+from .. import util
+from ..util.compat import inspect_getargspec
+
+
+def skip_if(predicate, reason=None):
+ rule = compound()
+ pred = _as_predicate(predicate, reason)
+ rule.skips.add(pred)
+ return rule
+
+
+def fails_if(predicate, reason=None):
+ rule = compound()
+ pred = _as_predicate(predicate, reason)
+ rule.fails.add(pred)
+ return rule
+
+
+class compound(object):
+ def __init__(self):
+ self.fails = set()
+ self.skips = set()
+ self.tags = set()
+ self.combinations = {}
+
+ def __add__(self, other):
+ return self.add(other)
+
+ def with_combination(self, **kw):
+ copy = compound()
+ copy.fails.update(self.fails)
+ copy.skips.update(self.skips)
+ copy.tags.update(self.tags)
+ copy.combinations.update((f, kw) for f in copy.fails)
+ copy.combinations.update((s, kw) for s in copy.skips)
+ return copy
+
+ def add(self, *others):
+ copy = compound()
+ copy.fails.update(self.fails)
+ copy.skips.update(self.skips)
+ copy.tags.update(self.tags)
+ for other in others:
+ copy.fails.update(other.fails)
+ copy.skips.update(other.skips)
+ copy.tags.update(other.tags)
+ return copy
+
+ def not_(self):
+ copy = compound()
+ copy.fails.update(NotPredicate(fail) for fail in self.fails)
+ copy.skips.update(NotPredicate(skip) for skip in self.skips)
+ copy.tags.update(self.tags)
+ return copy
+
+ @property
+ def enabled(self):
+ return self.enabled_for_config(config._current)
+
+ def enabled_for_config(self, config):
+ for predicate in self.skips.union(self.fails):
+ if predicate(config):
+ return False
+ else:
+ return True
+
+ def matching_config_reasons(self, config):
+ return [
+ predicate._as_string(config)
+ for predicate in self.skips.union(self.fails)
+ if predicate(config)
+ ]
+
+ def include_test(self, include_tags, exclude_tags):
+ return bool(
+ not self.tags.intersection(exclude_tags)
+ and (not include_tags or self.tags.intersection(include_tags))
+ )
+
+ def _extend(self, other):
+ self.skips.update(other.skips)
+ self.fails.update(other.fails)
+ self.tags.update(other.tags)
+ self.combinations.update(other.combinations)
+
+ def __call__(self, fn):
+ if hasattr(fn, "_sa_exclusion_extend"):
+ fn._sa_exclusion_extend._extend(self)
+ return fn
+
+ @decorator
+ def decorate(fn, *args, **kw):
+ return self._do(config._current, fn, *args, **kw)
+
+ decorated = decorate(fn)
+ decorated._sa_exclusion_extend = self
+ return decorated
+
+ @contextlib.contextmanager
+ def fail_if(self):
+ all_fails = compound()
+ all_fails.fails.update(self.skips.union(self.fails))
+
+ try:
+ yield
+ except Exception as ex:
+ all_fails._expect_failure(config._current, ex, None)
+ else:
+ all_fails._expect_success(config._current, None)
+
+ def _check_combinations(self, combination, predicate):
+ if predicate in self.combinations:
+ for k, v in combination:
+ if (
+ k in self.combinations[predicate]
+ and self.combinations[predicate][k] != v
+ ):
+ return False
+ return True
+
+ def _do(self, cfg, fn, *args, **kw):
+ if len(args) > 1:
+ insp = inspect_getargspec(fn)
+ combination = list(zip(insp.args[1:], args[1:]))
+ else:
+ combination = None
+
+ for skip in self.skips:
+ if self._check_combinations(combination, skip) and skip(cfg):
+ msg = "'%s' : %s" % (
+ fixture_functions.get_current_test_name(),
+ skip._as_string(cfg),
+ )
+ config.skip_test(msg)
+
+ try:
+ return_value = fn(*args, **kw)
+ except Exception as ex:
+ self._expect_failure(cfg, ex, combination, name=fn.__name__)
+ else:
+ self._expect_success(cfg, combination, name=fn.__name__)
+ return return_value
+
+ def _expect_failure(self, config, ex, combination, name="block"):
+ for fail in self.fails:
+ if self._check_combinations(combination, fail) and fail(config):
+ if sqla_util.py2k:
+ str_ex = unicode(ex).encode( # noqa: F821
+ "utf-8", errors="ignore"
+ )
+ else:
+ str_ex = str(ex)
+ print(
+ (
+ "%s failed as expected (%s): %s "
+ % (name, fail._as_string(config), str_ex)
+ )
+ )
+ break
+ else:
+ util.raise_from_cause(ex)
+
+ def _expect_success(self, config, combination, name="block"):
+ if not self.fails:
+ return
+
+ for fail in self.fails:
+ if self._check_combinations(combination, fail) and fail(config):
+ raise AssertionError(
+ "Unexpected success for '%s' (%s)"
+ % (
+ name,
+ " and ".join(
+ fail._as_string(config) for fail in self.fails
+ ),
+ )
+ )
+
+
+def requires_tag(tagname):
+ return tags([tagname])
+
+
+def tags(tagnames):
+ comp = compound()
+ comp.tags.update(tagnames)
+ return comp
+
+
+def only_if(predicate, reason=None):
+ predicate = _as_predicate(predicate)
+ return skip_if(NotPredicate(predicate), reason)
+
+
+def succeeds_if(predicate, reason=None):
+ predicate = _as_predicate(predicate)
+ return fails_if(NotPredicate(predicate), reason)
+
+
+class Predicate(object):
+ @classmethod
+ def as_predicate(cls, predicate, description=None):
+ if isinstance(predicate, compound):
+ return cls.as_predicate(predicate.enabled_for_config, description)
+ elif isinstance(predicate, Predicate):
+ if description and predicate.description is None:
+ predicate.description = description
+ return predicate
+ elif isinstance(predicate, (list, set)):
+ return OrPredicate(
+ [cls.as_predicate(pred) for pred in predicate], description
+ )
+ elif isinstance(predicate, tuple):
+ return SpecPredicate(*predicate)
+ elif isinstance(predicate, sqla_util.string_types):
+ tokens = re.match(
+ r"([\+\w]+)\s*(?:(>=|==|!=|<=|<|>)\s*([\d\.]+))?", predicate
+ )
+ if not tokens:
+ raise ValueError(
+ "Couldn't locate DB name in predicate: %r" % predicate
+ )
+ db = tokens.group(1)
+ op = tokens.group(2)
+ spec = (
+ tuple(int(d) for d in tokens.group(3).split("."))
+ if tokens.group(3)
+ else None
+ )
+
+ return SpecPredicate(db, op, spec, description=description)
+ elif callable(predicate):
+ return LambdaPredicate(predicate, description)
+ else:
+ assert False, "unknown predicate type: %s" % predicate
+
+ def _format_description(self, config, negate=False):
+ bool_ = self(config)
+ if negate:
+ bool_ = not negate
+ return self.description % {
+ "driver": config.db.url.get_driver_name()
+ if config
+ else "<no driver>",
+ "database": config.db.url.get_backend_name()
+ if config
+ else "<no database>",
+ "doesnt_support": "doesn't support" if bool_ else "does support",
+ "does_support": "does support" if bool_ else "doesn't support",
+ }
+
+ def _as_string(self, config=None, negate=False):
+ raise NotImplementedError()
+
+
+class BooleanPredicate(Predicate):
+ def __init__(self, value, description=None):
+ self.value = value
+ self.description = description or "boolean %s" % value
+
+ def __call__(self, config):
+ return self.value
+
+ def _as_string(self, config, negate=False):
+ return self._format_description(config, negate=negate)
+
+
+class SpecPredicate(Predicate):
+ def __init__(self, db, op=None, spec=None, description=None):
+ self.db = db
+ self.op = op
+ self.spec = spec
+ self.description = description
+
+ _ops = {
+ "<": operator.lt,
+ ">": operator.gt,
+ "==": operator.eq,
+ "!=": operator.ne,
+ "<=": operator.le,
+ ">=": operator.ge,
+ "in": operator.contains,
+ "between": lambda val, pair: val >= pair[0] and val <= pair[1],
+ }
+
+ def __call__(self, config):
+ engine = config.db
+
+ if "+" in self.db:
+ dialect, driver = self.db.split("+")
+ else:
+ dialect, driver = self.db, None
+
+ if dialect and engine.name != dialect:
+ return False
+ if driver is not None and engine.driver != driver:
+ return False
+
+ if self.op is not None:
+ assert driver is None, "DBAPI version specs not supported yet"
+
+ version = _server_version(engine)
+ oper = (
+ hasattr(self.op, "__call__") and self.op or self._ops[self.op]
+ )
+ return oper(version, self.spec)
+ else:
+ return True
+
+ def _as_string(self, config, negate=False):
+ if self.description is not None:
+ return self._format_description(config)
+ elif self.op is None:
+ if negate:
+ return "not %s" % self.db
+ else:
+ return "%s" % self.db
+ else:
+ if negate:
+ return "not %s %s %s" % (self.db, self.op, self.spec)
+ else:
+ return "%s %s %s" % (self.db, self.op, self.spec)
+
+
+class LambdaPredicate(Predicate):
+ def __init__(self, lambda_, description=None, args=None, kw=None):
+ spec = inspect_getargspec(lambda_)
+ if not spec[0]:
+ self.lambda_ = lambda db: lambda_()
+ else:
+ self.lambda_ = lambda_
+ self.args = args or ()
+ self.kw = kw or {}
+ if description:
+ self.description = description
+ elif lambda_.__doc__:
+ self.description = lambda_.__doc__
+ else:
+ self.description = "custom function"
+
+ def __call__(self, config):
+ return self.lambda_(config)
+
+ def _as_string(self, config, negate=False):
+ return self._format_description(config)
+
+
+class NotPredicate(Predicate):
+ def __init__(self, predicate, description=None):
+ self.predicate = predicate
+ self.description = description
+
+ def __call__(self, config):
+ return not self.predicate(config)
+
+ def _as_string(self, config, negate=False):
+ if self.description:
+ return self._format_description(config, not negate)
+ else:
+ return self.predicate._as_string(config, not negate)
+
+
+class OrPredicate(Predicate):
+ def __init__(self, predicates, description=None):
+ self.predicates = predicates
+ self.description = description
+
+ def __call__(self, config):
+ for pred in self.predicates:
+ if pred(config):
+ return True
+ return False
+
+ def _eval_str(self, config, negate=False):
+ if negate:
+ conjunction = " and "
+ else:
+ conjunction = " or "
+ return conjunction.join(
+ p._as_string(config, negate=negate) for p in self.predicates
+ )
+
+ def _negation_str(self, config):
+ if self.description is not None:
+ return "Not " + self._format_description(config)
+ else:
+ return self._eval_str(config, negate=True)
+
+ def _as_string(self, config, negate=False):
+ if negate:
+ return self._negation_str(config)
+ else:
+ if self.description is not None:
+ return self._format_description(config)
+ else:
+ return self._eval_str(config)
+
+
+_as_predicate = Predicate.as_predicate
+
+
+def _is_excluded(db, op, spec):
+ return SpecPredicate(db, op, spec)(config._current)
+
+
+def _server_version(engine):
+ """Return a server_version_info tuple."""
+
+ # force metadata to be retrieved
+ conn = engine.connect()
+ version = getattr(engine.dialect, "server_version_info", None)
+ if version is None:
+ version = ()
+ conn.close()
+ return version
+
+
+def db_spec(*dbs):
+ return OrPredicate([Predicate.as_predicate(db) for db in dbs])
+
+
+def open(): # noqa
+ return skip_if(BooleanPredicate(False, "mark as execute"))
+
+
+def closed():
+ return skip_if(BooleanPredicate(True, "marked as skip"))
+
+
+def fails(reason=None):
+ return fails_if(BooleanPredicate(True, reason or "expected to fail"))
+
+
+@decorator
+def future(fn, *arg):
+ return fails_if(LambdaPredicate(fn), "Future feature")
+
+
+def fails_on(db, reason=None):
+ return fails_if(db, reason)
+
+
+def fails_on_everything_except(*dbs):
+ return succeeds_if(OrPredicate([Predicate.as_predicate(db) for db in dbs]))
+
+
+def skip(db, reason=None):
+ return skip_if(db, reason)
+
+
+def only_on(dbs, reason=None):
+ return only_if(
+ OrPredicate(
+ [Predicate.as_predicate(db, reason) for db in util.to_list(dbs)]
+ )
+ )
+
+
+def exclude(db, op, spec, reason=None):
+ return skip_if(SpecPredicate(db, op, spec), reason)
+
+
+def against(config, *queries):
+ assert queries, "no queries sent!"
+ return OrPredicate([Predicate.as_predicate(query) for query in queries])(
+ config
+ )
ids for parameter sets are derived using an optional template.
"""
- from sqlalchemy.testing import exclusions
+ from alembic.testing import exclusions
if sys.version_info.major == 3:
if len(arg_sets) == 1 and hasattr(arg_sets[0], "__next__"):
comb_fn(getter(arg)) for getter, comb_fn in fns
)
)
- for arg in arg_sets
+ for arg in [
+ (arg,) if not isinstance(arg, tuple) else arg
+ for arg in arg_sets
+ ]
]
else:
# ensure using pytest.param so that even a 1-arg paramset
# still needs to be a tuple. otherwise paramtrize tries to
# interpret a single arg differently than tuple arg
arg_sets = [
- pytest.param(*_filter_exclusions(arg)) for arg in arg_sets
+ pytest.param(*_filter_exclusions(arg))
+ for arg in [
+ (arg,) if not isinstance(arg, tuple) else arg
+ for arg in arg_sets
+ ]
]
def decorate(fn):
import sys
-from sqlalchemy.testing import exclusions
from sqlalchemy.testing.requirements import Requirements
from alembic import util
+from alembic.testing import exclusions
from alembic.util import sqla_compat
@property
def alter_column(self):
return exclusions.open()
+
+ @property
+ def computed_columns(self):
+ return exclusions.closed()
+
+ @property
+ def computed_columns_api(self):
+ return exclusions.only_if(
+ exclusions.BooleanPredicate(sqla_compat.has_computed)
+ )
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
+import types
+
def flag_combinations(*combinations):
"""A facade around @testing.combinations() oriented towards boolean
)
+def resolve_lambda(__fn, **kw):
+ """Given a no-arg lambda and a namespace, return a new lambda that
+ has all the values filled in.
+
+ This is used so that we can have module-level fixtures that
+ refer to instance-level variables using lambdas.
+
+ """
+
+ glb = dict(__fn.__globals__)
+ glb.update(kw)
+ new_fn = types.FunctionType(__fn.__code__, glb)
+ return new_fn()
+
+
def metadata_fixture(ddl="function"):
"""Provide MetaData for a pytest fixture."""
from .sqla_compat import sqla_120 # noqa
from .sqla_compat import sqla_1216 # noqa
from .sqla_compat import sqla_13 # noqa
+from .sqla_compat import has_computed # noqa
if not sqla_110:
sqla_1216 = _vers >= (1, 2, 16)
sqla_13 = _vers >= (1, 3)
sqla_14 = _vers >= (1, 4)
+try:
+ from sqlalchemy import Computed # noqa
+ has_computed = True
+except ImportError:
+ has_computed = False
AUTOINCREMENT_DEFAULT = "auto"
+def _server_default_is_computed(column):
+ if not has_computed:
+ return False
+ else:
+ return isinstance(column.computed, Computed)
+
+
def _table_for_constraint(constraint):
if isinstance(constraint, ForeignKeyConstraint):
return constraint.parent
--- /dev/null
+.. change::
+ :tags: usecase, operations
+ :tickets: 624
+
+ Added support for rendering of "computed" elements on :class:`.Column`
+ objects, supported in SQLAlchemy via the new :class:`.Computed` element
+ introduced in version 1.3.11. Pull request courtesy Federico Caselli.
+
+ Note that there is currently no support for ALTER COLUMN to add, remove, or
+ modify the "GENERATED ALWAYS AS" element from a column; at least for
+ PostgreSQL, it does not seem to be supported by the database. Additionally,
+ SQLAlchemy does not currently reliably reflect the "GENERATED ALWAYS AS"
+ phrase from an existing column, so there is also no autogenerate support
+ for addition or removal of the :class:`.Computed` element to or from an
+ existing column, there is only support for adding new columns that include
+ the :class:`.Computed` element. In the case that the :class:`.Computed`
+ element is removed from the :class:`.Column` object in the table metadata,
+ PostgreSQL and Oracle currently reflect the "GENERATED ALWAYS AS"
+ expression as the "server default" which will produce an op that tries to
+ drop the element as a default.
-from sqlalchemy.testing import exclusions
-
+from alembic.testing import exclusions
from alembic.testing.requirements import SuiteRequirements
from alembic.util import sqla_compat
return exclusions.only_on("postgresql", "mysql")
+ @property
+ def computed_columns(self):
+ # TODO: in theory if these could come from SQLAlchemy dialects
+ # that would be helpful
+ return self.computed_columns_api + exclusions.only_on(
+ ["postgresql >= 12", "oracle", "mssql", "mysql >= 5.7"]
+ )
+
+ @property
+ def computed_reflects_as_server_default(self):
+ # note that this rule will go away when SQLAlchemy correctly
+ # supports reflection of the "computed" construct; the element
+ # will consistently be present as both column.computed and
+ # column.server_default for all supported backends.
+ return self.computed_columns + exclusions.only_if(
+ ["postgresql", "oracle"],
+ "backend reflects computed construct as a server default",
+ )
+
+ @property
+ def computed_doesnt_reflect_as_server_default(self):
+ # note that this rule will go away when SQLAlchemy correctly
+ # supports reflection of the "computed" construct; the element
+ # will consistently be present as both column.computed and
+ # column.server_default for all supported backends.
+ return self.computed_columns + exclusions.skip_if(
+ ["postgresql", "oracle"],
+ "backend reflects computed construct as a server default",
+ )
+
@property
def check_constraint_reflection(self):
return exclusions.fails_on_everything_except(
--- /dev/null
+import sqlalchemy as sa
+from sqlalchemy import Column
+from sqlalchemy import Integer
+from sqlalchemy import MetaData
+from sqlalchemy import Table
+
+from alembic import testing
+from alembic.testing import config
+from alembic.testing import eq_
+from alembic.testing import exclusions
+from alembic.testing import is_
+from alembic.testing import is_true
+from alembic.testing import TestBase
+from ._autogen_fixtures import AutogenFixtureTest
+
+
+class AutogenerateComputedTest(AutogenFixtureTest, TestBase):
+ __requires__ = ("computed_columns",)
+ __backend__ = True
+
+ def test_add_computed_column(self):
+ m1 = MetaData()
+ m2 = MetaData()
+
+ Table("user", m1, Column("id", Integer, primary_key=True))
+
+ Table(
+ "user",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("foo", Integer, sa.Computed("5")),
+ )
+
+ diffs = self._fixture(m1, m2)
+
+ eq_(diffs[0][0], "add_column")
+ eq_(diffs[0][2], "user")
+ eq_(diffs[0][3].name, "foo")
+ c = diffs[0][3].computed
+
+ is_true(isinstance(c, sa.Computed))
+ is_(c.persisted, None)
+ eq_(str(c.sqltext), "5")
+
+ def test_remove_computed_column(self):
+ m1 = MetaData()
+ m2 = MetaData()
+
+ Table(
+ "user",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("foo", Integer, sa.Computed("5")),
+ )
+
+ Table("user", m2, Column("id", Integer, primary_key=True))
+
+ diffs = self._fixture(m1, m2)
+
+ eq_(diffs[0][0], "remove_column")
+ eq_(diffs[0][2], "user")
+ c = diffs[0][3]
+ eq_(c.name, "foo")
+
+ is_(c.computed, None)
+
+ if config.requirements.computed_reflects_as_server_default.enabled:
+ is_true(isinstance(c.server_default, sa.DefaultClause))
+ eq_(str(c.server_default.arg.text), "5")
+ else:
+ is_(c.server_default, None)
+
+ @testing.combinations(
+ lambda: (sa.Computed("5"), sa.Computed("5")),
+ lambda: (sa.Computed("bar*5"), sa.Computed("bar*5")),
+ lambda: (sa.Computed("bar*5"), sa.Computed("bar * 42")),
+ lambda: (
+ sa.Computed("bar*5"),
+ sa.Computed("bar * 42", persisted=True),
+ ),
+ lambda: (None, sa.Computed("bar*5")),
+ (
+ lambda: (sa.Computed("bar*5"), None),
+ config.requirements.computed_doesnt_reflect_as_server_default,
+ ),
+ )
+ def test_computed_unchanged(self, test_case):
+ arg_before, arg_after = testing.resolve_lambda(test_case, **locals())
+ m1 = MetaData()
+ m2 = MetaData()
+
+ arg_before = [] if arg_before is None else [arg_before]
+ arg_after = [] if arg_after is None else [arg_after]
+
+ Table(
+ "user",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("bar", Integer),
+ Column("foo", Integer, *arg_before),
+ )
+
+ Table(
+ "user",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("bar", Integer),
+ Column("foo", Integer, *arg_after),
+ )
+
+ diffs = self._fixture(m1, m2)
+
+ eq_(len(diffs), 0)
+
+ @config.requirements.computed_reflects_as_server_default
+ def test_remove_computed_default_on_computed(self):
+ """Asserts the current behavior which is that on PG and Oracle,
+ the GENERATED ALWAYS AS is reflected as a server default which we can't
+ tell is actually "computed", so these come out as a modification to
+ the server default.
+
+ """
+ m1 = MetaData()
+ m2 = MetaData()
+
+ Table(
+ "user",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("bar", Integer),
+ Column("foo", Integer, sa.Computed("bar + 42")),
+ )
+
+ Table(
+ "user",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("bar", Integer),
+ Column("foo", Integer),
+ )
+
+ diffs = self._fixture(m1, m2)
+
+ eq_(diffs[0][0][0], "modify_default")
+ eq_(diffs[0][0][2], "user")
+ eq_(diffs[0][0][3], "foo")
+ old = diffs[0][0][-2]
+ new = diffs[0][0][-1]
+
+ is_(new, None)
+ is_true(isinstance(old, sa.DefaultClause))
+
+ if exclusions.against(config, "postgresql"):
+ eq_(str(old.arg.text), "(bar + 42)")
+ elif exclusions.against(config, "oracle"):
+ eq_(str(old.arg.text), '"BAR"+42')
from sqlalchemy import PrimaryKeyConstraint
from sqlalchemy import String
from sqlalchemy import Table
-from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import types
from sqlalchemy import Unicode
from alembic import autogenerate
from alembic import op # noqa
+from alembic import testing
from alembic.autogenerate import api
from alembic.migration import MigrationContext
from alembic.operations import ops
"server_default='5', nullable=True))",
)
- @testing.requires.sqlalchemy_13
+ @config.requirements.sqlalchemy_13
@testing.emits_warning("Can't validate argument ")
def test_render_add_column_custom_kwarg(self):
col = Column(
")",
)
+ @config.requirements.computed_columns_api
+ def test_render_add_column_computed(self):
+ c = sa.Computed("5")
+ op_obj = ops.AddColumnOp("foo", Column("x", Integer, c))
+ eq_ignore_whitespace(
+ autogenerate.render_op_text(self.autogen_context, op_obj),
+ "op.add_column('foo', sa.Column('x', sa.Integer(), "
+ "sa.Computed(!U'5', ), nullable=True))",
+ )
+
+ @config.requirements.computed_columns_api
+ @testing.combinations((True,), (False,))
+ def test_render_add_column_computed_persisted(self, persisted):
+ op_obj = ops.AddColumnOp(
+ "foo", Column("x", Integer, sa.Computed("5", persisted=persisted))
+ )
+ eq_ignore_whitespace(
+ autogenerate.render_op_text(self.autogen_context, op_obj),
+ "op.add_column('foo', sa.Column('x', sa.Integer(), "
+ "sa.Computed(!U'5', persisted=%s), nullable=True))" % persisted,
+ )
+
+ @config.requirements.computed_columns_api
+ def test_render_alter_column_computed_modify_default(self):
+ op_obj = ops.AlterColumnOp(
+ "sometable", "somecolumn", modify_server_default=sa.Computed("7")
+ )
+ eq_ignore_whitespace(
+ autogenerate.render_op_text(self.autogen_context, op_obj),
+ "op.alter_column('sometable', 'somecolumn', "
+ "server_default=sa.Computed(!U'7', ))",
+ )
+
+ @config.requirements.computed_columns_api
+ def test_render_alter_column_computed_existing_default(self):
+ op_obj = ops.AlterColumnOp(
+ "sometable",
+ "somecolumn",
+ existing_server_default=sa.Computed("42"),
+ )
+ eq_ignore_whitespace(
+ autogenerate.render_op_text(self.autogen_context, op_obj),
+ "op.alter_column('sometable', 'somecolumn', "
+ "existing_server_default=sa.Computed(!U'42', ))",
+ )
+
+ @config.requirements.computed_columns_api
+ @testing.combinations((True,), (False,))
+ def test_render_alter_column_computed_modify_default_perisisted(
+ self, persisted
+ ):
+ op_obj = ops.AlterColumnOp(
+ "sometable",
+ "somecolumn",
+ modify_server_default=sa.Computed("7", persisted=persisted),
+ )
+ eq_ignore_whitespace(
+ autogenerate.render_op_text(self.autogen_context, op_obj),
+ "op.alter_column('sometable', 'somecolumn', server_default"
+ "=sa.Computed(!U'7', persisted=%s))" % persisted,
+ )
+
+ @config.requirements.computed_columns_api
+ @testing.combinations((True,), (False,))
+ def test_render_alter_column_computed_existing_default_perisisted(
+ self, persisted
+ ):
+ c = sa.Computed("42", persisted=persisted)
+ op_obj = ops.AlterColumnOp(
+ "sometable", "somecolumn", existing_server_default=c
+ )
+ eq_ignore_whitespace(
+ autogenerate.render_op_text(self.autogen_context, op_obj),
+ "op.alter_column('sometable', 'somecolumn', "
+ "existing_server_default=sa.Computed(!U'42', persisted=%s))"
+ % persisted,
+ )
+
class RenderNamingConventionTest(TestBase):
def setUp(self):
from alembic import op
from alembic import util
from alembic.testing import assert_raises_message
+from alembic.testing import config
from alembic.testing import eq_
from alembic.testing.env import _no_sql_testing_config
from alembic.testing.env import clear_staging_env
from alembic.testing.fixtures import capture_context_buffer
from alembic.testing.fixtures import op_fixture
from alembic.testing.fixtures import TestBase
+from alembic.util import sqla_compat
class FullEnvironmentTests(TestBase):
"exec('alter table t drop constraint ' + @const_name)"
)
+ @config.requirements.computed_columns_api
+ def test_add_column_computed(self):
+ context = op_fixture("mssql")
+ op.add_column(
+ "t1",
+ Column("some_column", Integer, sqla_compat.Computed("foo * 5")),
+ )
+ context.assert_("ALTER TABLE t1 ADD some_column AS (foo * 5)")
+
def test_alter_do_everything(self):
context = op_fixture("mssql")
op.alter_column(
from alembic.testing.fixtures import AlterColRoundTripFixture
from alembic.testing.fixtures import op_fixture
from alembic.testing.fixtures import TestBase
+from alembic.util import sqla_compat
class MySQLOpTest(TestBase):
op.drop_table_comment("t2", existing_comment="t2 table", schema="foo")
context.assert_("ALTER TABLE foo.t2 COMMENT ''")
+ @config.requirements.computed_columns_api
+ def test_add_column_computed(self):
+ context = op_fixture("mysql")
+ op.add_column(
+ "t1",
+ Column("some_column", Integer, sqla_compat.Computed("foo * 5")),
+ )
+ context.assert_(
+ "ALTER TABLE t1 ADD COLUMN some_column "
+ "INTEGER GENERATED ALWAYS AS (foo * 5)"
+ )
+
def test_drop_fk(self):
context = op_fixture("mysql")
op.drop_constraint("f1", "t1", "foreignkey")
from sqlalchemy import Boolean
from sqlalchemy import Column
from sqlalchemy import event
+from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import String
from alembic.testing.fixtures import AlterColRoundTripFixture
from alembic.testing.fixtures import op_fixture
from alembic.testing.fixtures import TestBase
+from alembic.util import sqla_compat
@event.listens_for(Table, "after_parent_attach")
op.alter_column("t", "c", server_default=None, schema="foo")
context.assert_("ALTER TABLE foo.t ALTER COLUMN c DROP DEFAULT")
+ @config.requirements.computed_columns_api
+ def test_alter_column_computed_add_not_supported(self):
+ op_fixture()
+ assert_raises_message(
+ exc.CompileError,
+ 'Adding or removing a "computed" construct, e.g. '
+ "GENERATED ALWAYS AS, to or from an existing column is not "
+ "supported.",
+ op.alter_column,
+ "t1",
+ "c1",
+ server_default=sqla_compat.Computed("foo * 5"),
+ )
+
+ @config.requirements.computed_columns_api
+ def test_alter_column_computed_remove_not_supported(self):
+ op_fixture()
+ assert_raises_message(
+ exc.CompileError,
+ 'Adding or removing a "computed" construct, e.g. '
+ "GENERATED ALWAYS AS, to or from an existing column is not "
+ "supported.",
+ op.alter_column,
+ "t1",
+ "c1",
+ server_default=None,
+ existing_server_default=sqla_compat.Computed("foo * 5"),
+ )
+
def test_alter_column_schema_type_unnamed(self):
context = op_fixture("mssql", native_boolean=False)
op.alter_column("t", "c", type_=Boolean())
from alembic.testing.fixtures import capture_context_buffer
from alembic.testing.fixtures import op_fixture
from alembic.testing.fixtures import TestBase
+from alembic.util import sqla_compat
class FullEnvironmentTests(TestBase):
"COMMENT ON COLUMN t1.c1 IS 'c1 comment'",
)
+ @config.requirements.computed_columns_api
+ def test_add_column_computed(self):
+ context = op_fixture("oracle")
+ op.add_column(
+ "t1",
+ Column("some_column", Integer, sqla_compat.Computed("foo * 5")),
+ )
+ context.assert_(
+ "ALTER TABLE t1 ADD some_column "
+ "INTEGER GENERATED ALWAYS AS (foo * 5)"
+ )
+
def test_alter_column_rename_oracle(self):
context = op_fixture("oracle")
op.alter_column("t", "c", name="x")
from alembic.testing.fixtures import capture_context_buffer
from alembic.testing.fixtures import op_fixture
from alembic.testing.fixtures import TestBase
+from alembic.util import sqla_compat
class PostgresqlOpTest(TestBase):
op.drop_table_comment("t2", existing_comment="t2 table", schema="foo")
context.assert_("COMMENT ON TABLE foo.t2 IS NULL")
+ @config.requirements.computed_columns_api
+ def test_add_column_computed(self):
+ context = op_fixture("postgresql")
+ op.add_column(
+ "t1",
+ Column("some_column", Integer, sqla_compat.Computed("foo * 5")),
+ )
+ context.assert_(
+ "ALTER TABLE t1 ADD COLUMN some_column "
+ "INTEGER GENERATED ALWAYS AS (foo * 5) STORED"
+ )
+
class PGAutocommitBlockTest(TestBase):
__only_on__ = "postgresql"