--- /dev/null
+.. change::
+ :tags: bug, schema
+ :tickets: 8925
+
+ Stricter rules are in place for appending of :class:`.Column` objects to
+ :class:`.Table` objects, both moving some previous deprecation warnings to
+ exceptions, and preventing some previous scenarios that would cause
+ duplicate columns to appear in tables, when
+ :paramref:`.Table.extend_existing` were set to ``True``, for both
+ programmatic :class:`.Table` construction as well as during reflection
+ operations.
+
+ See :ref:`change_8925` for a rundown of these changes.
+
+ .. seealso::
+
+ :ref:`change_8925`
:ticket:`8567`
+.. _change_8925:
+
+Stricter rules for replacement of Columns in Table objects with same-names, keys
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Stricter rules are in place for appending of :class:`.Column` objects to
+:class:`.Table` objects, both moving some previous deprecation warnings to
+exceptions, and preventing some previous scenarios that would cause
+duplicate columns to appear in tables, when
+:paramref:`.Table.extend_existing` were set to ``True``, for both
+programmatic :class:`.Table` construction as well as during reflection
+operations.
+
+* Under no circumstances should a :class:`.Table` object ever have two or more
+ :class:`.Column` objects with the same name, regardless of what .key they
+ have. An edge case where this was still possible was identified and fixed.
+
+* Adding a :class:`.Column` to a :class:`.Table` that has the same name or
+ key as an existing :class:`.Column` will always raise
+ :class:`.DuplicateColumnError` (a new subclass of :class:`.ArgumentError` in
+ 2.0.0b4) unless additional parameters are present;
+ :paramref:`.Table.append_column.replace_existing` for
+ :meth:`.Table.append_column`, and :paramref:`.Table.extend_existing` for
+ construction of a same-named :class:`.Table` as an existing one, with or
+ without reflection being used. Previously, there was a deprecation warning in
+ place for this scenario.
+
+* A warning is now emitted if a :class:`.Table` is created, that does
+ include :paramref:`.Table.extend_existing`, where an incoming
+ :class:`.Column` that has no separate :attr:`.Column.key` would fully
+ replace an existing :class:`.Column` that does have a key, which suggests
+ the operation is not what the user intended. This can happen particularly
+ during a secondary reflection step, such as ``metadata.reflect(extend_existing=True)``.
+ The warning suggests that the :paramref:`.Table.autoload_replace` parameter
+ be set to ``False`` to prevent this. Previously, in 1.4 and earlier, the
+ incoming column would be added **in addition** to the existing column.
+ This was a bug and is a behavioral change in 2.0 (as of 2.0.0b4), as the
+ previous key will **no longer be present** in the column collection
+ when this occurs.
+
+
+:ticket:`8925`
.. _change_7211:
"""
+class DuplicateColumnError(ArgumentError):
+ """a Column is being added to a Table that would replace another
+ Column, without appropriate parameters to allow this in place.
+
+ .. versionadded:: 2.0.0b4
+
+ """
+
+
class ObjectNotExecutableError(ArgumentError):
"""Raised when an object is passed to .execute() that can't be
executed as SQL.
# delete higher index
del self._index[len(self._collection)]
- def replace(self, column: _NAMEDCOL) -> None:
+ def replace(
+ self,
+ column: _NAMEDCOL,
+ extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
+ ) -> None:
"""add the given column to this collection, removing unaliased
versions of this column as well as existing columns with the
same key.
"""
- remove_col = set()
+ if extra_remove:
+ remove_col = set(extra_remove)
+ else:
+ remove_col = set()
# remove up to two columns based on matches of name as well as key
if column.name in self._index and column.key != column.name:
other = self._index[column.name][1]
allow_replacements=extend_existing
or keep_existing
or autoload_with,
+ all_names={},
)
def _autoload(
schema = kwargs.pop("schema", None)
_extend_on = kwargs.pop("_extend_on", None)
_reflect_info = kwargs.pop("_reflect_info", None)
+
# these arguments are only used with _init()
- kwargs.pop("extend_existing", False)
- kwargs.pop("keep_existing", False)
+ extend_existing = kwargs.pop("extend_existing", False)
+ keep_existing = kwargs.pop("keep_existing", False)
+
+ assert extend_existing
+ assert not keep_existing
if schema and schema != self.schema:
raise exc.ArgumentError(
_reflect_info=_reflect_info,
)
+ all_names = {c.name: c for c in self.c}
self._extra_kwargs(**kwargs)
- self._init_items(*args)
+ self._init_items(*args, allow_replacements=True, all_names=all_names)
def _extra_kwargs(self, **kwargs: Any) -> None:
self._validate_dialect_kwargs(kwargs)
.. versionadded:: 1.4.0
"""
- column._set_parent_with_dispatch(
- self, allow_replacements=replace_existing
- )
+ try:
+ column._set_parent_with_dispatch(
+ self,
+ allow_replacements=replace_existing,
+ all_names={c.name: c for c in self.c},
+ )
+ except exc.DuplicateColumnError as de:
+ raise exc.DuplicateColumnError(
+ f"{de.args[0]} Specify replace_existing=True to "
+ "Table.append_column() to replace an "
+ "existing column."
+ ) from de
def append_constraint(self, constraint: Union[Index, Constraint]) -> None:
"""Append a :class:`_schema.Constraint` to this
+ ["%s=%s" % (k, repr(getattr(self, k))) for k in kwarg]
)
- def _set_parent(
+ def _set_parent( # type: ignore[override]
self,
parent: SchemaEventTarget,
- allow_replacements: bool = True,
+ *,
+ all_names: Dict[str, Column[Any]],
+ allow_replacements: bool,
**kw: Any,
) -> None:
table = parent
% (self.key, existing.description)
)
+ extra_remove = None
+ existing_col = None
+ conflicts_on = ""
+
if self.key in table._columns:
- col = table._columns[self.key]
- if col is not self:
+ existing_col = table._columns[self.key]
+ if self.key == self.name:
+ conflicts_on = "name"
+ else:
+ conflicts_on = "key"
+ elif self.name in all_names:
+ existing_col = all_names[self.name]
+ extra_remove = {existing_col}
+ conflicts_on = "name"
+
+ if existing_col is not None:
+ if existing_col is not self:
if not allow_replacements:
- util.warn_deprecated(
- "A column with name '%s' is already present "
- "in table '%s'. Please use method "
- ":meth:`_schema.Table.append_column` with the "
- "parameter ``replace_existing=True`` to replace an "
- "existing column." % (self.key, table.name),
- "1.4",
+ raise exc.DuplicateColumnError(
+ f"A column with {conflicts_on} "
+ f"""'{
+ self.key if conflicts_on == 'key' else self.name
+ }' """
+ f"is already present in table '{table.name}'."
)
- for fk in col.foreign_keys:
+ for fk in existing_col.foreign_keys:
table.foreign_keys.remove(fk)
if fk.constraint in table.constraints:
# this might have been removed
# and more than one col being replaced
table.constraints.remove(fk.constraint)
- table._columns.replace(self)
-
+ if extra_remove and existing_col is not None and self.key == self.name:
+ util.warn(
+ f'Column with user-specified key "{existing_col.key}" is '
+ "being replaced with "
+ f'plain named column "{self.name}", '
+ f'key "{existing_col.key}" is being removed. If this is a '
+ "reflection operation, specify autoload_replace=False to "
+ "prevent this replacement."
+ )
+ table._columns.replace(self, extra_remove=extra_remove)
+ all_names[self.name] = self
self.table = table
if self.primary_key:
from .config import fixture
from .config import requirements as requires
from .config import skip_test
+from .config import Variation
from .config import variation
from .exclusions import _is_excluded
from .exclusions import _server_version
from typing import Any
from typing import Callable
from typing import Iterable
+from typing import NoReturn
from typing import Optional
from typing import Tuple
from typing import TypeVar
from typing import Union
+from .util import fail
from .. import util
requirements = None
return combinations(*arg_iterable, **kw)
-class _variation_base:
- __slots__ = ("name", "argname")
+class Variation:
+ __slots__ = ("_name", "_argname")
def __init__(self, case, argname, case_names):
- self.name = case
- self.argname = argname
+ self._name = case
+ self._argname = argname
for casename in case_names:
setattr(self, casename, casename == case)
+ if typing.TYPE_CHECKING:
+
+ def __getattr__(self, key: str) -> bool:
+ ...
+
+ @property
+ def name(self):
+ return self._name
+
def __bool__(self):
- return self.name == self.argname
+ return self._name == self._argname
def __nonzero__(self):
return not self.__bool__()
+ def __str__(self):
+ return f"{self._argname}={self._name!r}"
+
+ def fail(self) -> NoReturn:
+ fail(f"Unknown {self}")
+
def variation(argname, cases):
"""a helper around testing.combinations that provides a single namespace
typ = type(
argname,
- (_variation_base,),
+ (Variation,),
{
"__slots__": tuple(case_names),
},
(
[
sa_exceptions.ArgumentError,
+ sa_exceptions.DuplicateColumnError,
sa_exceptions.NoSuchModuleError,
sa_exceptions.NoForeignKeysError,
sa_exceptions.AmbiguousForeignKeysError,
)
assert "nonexistent" not in meta.tables
- def test_extend_existing(self, connection, metadata):
+ @testing.variation("use_metadata_reflect", [True, False])
+ def test_extend_existing(self, connection, metadata, use_metadata_reflect):
meta = metadata
Table(
old_q = Column("q", Integer)
t2 = Table("t", m2, old_z, old_q)
eq_(list(t2.primary_key.columns), [t2.c.z])
+
t2 = Table(
"t",
m2,
extend_existing=True,
autoload_with=connection,
)
+ if use_metadata_reflect:
+ m2.reflect(connection, extend_existing=True)
eq_(set(t2.columns.keys()), {"x", "y", "z", "q", "id"})
# this has been the actual behavior, the cols are added together,
eq_(list(t2.primary_key.columns), [t2.c.z, t2.c.id])
assert t2.c.z is not old_z
- assert t2.c.y is old_y
+ if not use_metadata_reflect:
+ assert t2.c.y is old_y
assert t2.c.z.type._type_affinity is Integer
assert t2.c.q is old_q
extend_existing=False,
autoload_with=connection,
)
+ if use_metadata_reflect:
+ m3.reflect(connection, extend_existing=False)
eq_(set(t3.columns.keys()), {"z"})
m4 = MetaData()
autoload_replace=False,
autoload_with=connection,
)
+ if use_metadata_reflect:
+ m4.reflect(
+ connection, extend_existing=True, autoload_replace=False
+ )
eq_(set(t4.columns.keys()), {"x", "y", "z", "q", "id"})
eq_(list(t4.primary_key.columns), [t4.c.z, t4.c.id])
assert t4.c.z is old_z
assert t4.c.z.type._type_affinity is String
assert t4.c.q is old_q
+ @testing.variation(
+ "extend_type",
+ [
+ "autoload",
+ "metadata_reflect",
+ "metadata_reflect_no_replace",
+ "plain_table",
+ ],
+ )
+ def test_extend_existing_never_dupe_column(
+ self, connection, metadata, extend_type
+ ):
+ """test #8925"""
+ meta = metadata
+
+ Table(
+ "t",
+ meta,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer),
+ Column("y", Integer),
+ )
+ meta.create_all(connection)
+
+ m2 = MetaData()
+ if extend_type.metadata_reflect:
+ t2 = Table(
+ "t",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer, key="x2"),
+ )
+ with expect_warnings(
+ 'Column with user-specified key "x2" is being replaced '
+ 'with plain named column "x", key "x2" is being removed.'
+ ):
+ m2.reflect(connection, extend_existing=True)
+ eq_(set(t2.columns.keys()), {"x", "y", "id"})
+ elif extend_type.metadata_reflect_no_replace:
+ t2 = Table(
+ "t",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer, key="x2"),
+ )
+ m2.reflect(
+ connection, extend_existing=True, autoload_replace=False
+ )
+ eq_(set(t2.columns.keys()), {"x2", "y", "id"})
+ elif extend_type.autoload:
+ t2 = Table(
+ "t",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer, key="x2"),
+ autoload_with=connection,
+ extend_existing=True,
+ )
+ eq_(set(t2.columns.keys()), {"x2", "y", "id"})
+ elif extend_type.plain_table:
+ Table(
+ "t",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer, key="x2"),
+ )
+ with expect_warnings(
+ 'Column with user-specified key "x2" is being replaced with '
+ 'plain named column "x", key "x2" is being removed.'
+ ):
+ t2 = Table(
+ "t",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer),
+ Column("y", Integer),
+ extend_existing=True,
+ )
+ eq_(set(t2.columns.keys()), {"x", "y", "id"})
+
def test_extend_existing_reflect_all_dont_dupe_index(
self, connection, metadata
):
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import assertions
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
id = Column(Integer, primary_key=True)
def test_column_named_twice(self):
- with assertions.expect_deprecated(
- "A column with name 'x' is already present in table 'foo'"
- ), expect_warnings(
- "On class 'Foo', Column object 'x' named directly multiple times, "
- "only one will be used: x, y",
- ):
+ with expect_warnings(
+ "On class 'Foo', Column object 'x' named directly multiple "
+ "times, only one will be used: x, y. Consider using "
+ "orm.synonym instead"
+ ), expect_raises(exc.DuplicateColumnError):
class Foo(Base):
__tablename__ = "foo"
y = Column("x", Integer)
def test_column_repeated_under_prop(self):
- with assertions.expect_deprecated(
- "A column with name 'x' is already present in table 'foo'"
- ), expect_warnings(
- "On class 'Foo', Column object 'x' named directly multiple times, "
- "only one will be used: x, y, z",
- ):
+ with expect_warnings(
+ "On class 'Foo', Column object 'x' named directly multiple "
+ "times, only one will be used: x, y, z. Consider using "
+ "orm.synonym instead"
+ ), expect_raises(exc.DuplicateColumnError):
class Foo(Base):
__tablename__ = "foo"
from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
+from sqlalchemy.testing import Variation
from sqlalchemy.testing.assertions import expect_warnings
is_(t2.c.contains_column(g), False)
def test_table_ctor_duplicated_column_name(self):
- def go():
- return Table(
+ # when it will raise
+ with testing.expect_raises_message(
+ exc.ArgumentError,
+ "A column with name 'col' is already present in table 't'",
+ ):
+ Table(
"t",
MetaData(),
Column("a", Integer),
Column("col", String),
)
- with testing.expect_deprecated(
- "A column with name 'col' is already present in table 't'",
- ):
- t = go()
- is_true(isinstance(t.c.col.type, String))
- # when it will raise
- # with testing.expect_raises_message(
- # exc.ArgumentError,
- # "A column with name 'col' is already present in table 't'",
- # ):
- # go()
-
def test_append_column_existing_name(self):
t = Table("t", MetaData(), Column("col", Integer))
- with testing.expect_deprecated(
- "A column with name 'col' is already present in table 't'",
+ with testing.expect_raises_message(
+ exc.DuplicateColumnError,
+ r"A column with name 'col' is already present in table 't'. "
+ r"Specify replace_existing=True to Table.append_column\(\) to "
+ r"replace an existing column.",
):
t.append_column(Column("col", String))
- is_true(isinstance(t.c.col.type, String))
- # when it will raise
- # col = t.c.col
- # with testing.expect_raises_message(
- # exc.ArgumentError,
- # "A column with name 'col' is already present in table 't'",
- # ):
- # t.append_column(Column("col", String))
- # is_true(t.c.col is col)
-
- def test_append_column_replace_existing(self):
- t = Table("t", MetaData(), Column("col", Integer))
- t.append_column(Column("col", String), replace_existing=True)
- is_true(isinstance(t.c.col.type, String))
+
+ def test_append_column_existing_key(self):
+ t = Table("t", MetaData(), Column("col", Integer, key="c2"))
+
+ with testing.expect_raises_message(
+ exc.DuplicateColumnError,
+ r"A column with key 'c2' is already present in table 't'. "
+ r"Specify replace_existing=True to Table.append_column\(\) "
+ r"to replace an existing column.",
+ ):
+ t.append_column(Column("col", String, key="c2"))
+
+ @testing.variation("field", ["name", "key"])
+ def test_append_column_replace_existing(self, field: Variation):
+ if field.name:
+ t = Table("t", MetaData(), Column("col", Integer))
+ t.append_column(Column("col", String), replace_existing=True)
+ is_true(isinstance(t.c.col.type, String))
+ elif field.key:
+ t = Table("t", MetaData(), Column("col", Integer, key="c2"))
+ t.append_column(
+ Column("col", String, key="c2"), replace_existing=True
+ )
+ is_true(isinstance(t.c.c2.type, String))
+ else:
+ field.fail()
def test_autoincrement_replace(self):
m = MetaData()
assert t2.index("CREATE TABLE someschema.table2") > -1
-class UseExistingTest(fixtures.TablesTest):
+class UseExistingTest(testing.AssertsCompiledSQL, fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
Table(
def empty_meta(self):
return MetaData()
+ @testing.variation(
+ "scenario",
+ [
+ "inplace",
+ "inplace_ee",
+ "separate_ee_key_first",
+ "separate_ee_key_second",
+ "separate_ee_key_append_no_replace",
+ "separate_ee_key_append_replace",
+ ],
+ )
+ @testing.variation("both_have_keys", [True, False])
+ def test_table_w_two_same_named_columns(
+ self, empty_meta, scenario: Variation, both_have_keys: Variation
+ ):
+
+ if scenario.inplace:
+ with expect_raises_message(
+ exc.DuplicateColumnError,
+ "A column with name 'b' is already present in table 'users'.",
+ ):
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b1" if both_have_keys else None),
+ Column("b", String, key="b2"),
+ )
+ return
+ elif scenario.inplace_ee:
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b1" if both_have_keys else None),
+ Column("b", String, key="b2"),
+ extend_existing=True,
+ )
+ elif scenario.separate_ee_key_first:
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b2"),
+ )
+
+ expected_warnings = (
+ [
+ 'Column with user-specified key "b2" is being '
+ 'replaced with plain named column "b", key "b2" '
+ "is being removed."
+ ]
+ if not both_have_keys
+ else []
+ )
+ with expect_warnings(*expected_warnings):
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b1" if both_have_keys else None),
+ extend_existing=True,
+ )
+ elif scenario.separate_ee_key_second:
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b1" if both_have_keys else None),
+ )
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b2"),
+ extend_existing=True,
+ )
+ elif scenario.separate_ee_key_append_no_replace:
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b1" if both_have_keys else None),
+ )
+ with expect_raises_message(
+ exc.DuplicateColumnError,
+ r"A column with name 'b' is already present in table 'users'. "
+ r"Specify replace_existing=True to Table.append_column\(\) "
+ r"to replace an existing column.",
+ ):
+ t1.append_column(Column("b", String, key="b2"))
+ return
+ elif scenario.separate_ee_key_append_replace:
+ t1 = Table(
+ "users",
+ empty_meta,
+ Column("a", String),
+ Column("b", String, key="b1" if both_have_keys else None),
+ )
+ t1.append_column(
+ Column("b", String, key="b2"), replace_existing=True
+ )
+
+ else:
+ scenario.fail()
+
+ if scenario.separate_ee_key_first:
+ if both_have_keys:
+ eq_(t1.c.keys(), ["a", "b1"])
+ else:
+ eq_(t1.c.keys(), ["a", "b"])
+ else:
+ eq_(t1.c.keys(), ["a", "b2"])
+ self.assert_compile(select(t1), "SELECT users.a, users.b FROM users")
+
def test_exception_no_flags(self, existing_meta):
def go():
Table(