]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
disallow same-named columns, unchecked replacement in Table
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 3 Dec 2022 14:11:14 +0000 (09:11 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 4 Dec 2022 15:45:35 +0000 (10:45 -0500)
Fixed issue where table reflection using :paramref:`.Table.extend_existing`
would fail to deduplicate a same-named column if the existing
:class:`.Table` used a separate key. The
:paramref:`.Table.autoload_replace` parameter would allow the column to be
skipped but under no circumstances should a :class:`.Table` ever have the
same-named column twice.

Additionally, changed deprecation warnings to exceptions
as were implemented in I1d58c8ebe081079cb669e7ead60886ffc1b1a7f5 .

Fixes: #8925
Change-Id: I83d0f8658177a7ffbb06e01dbca91377d1a98d49

doc/build/changelog/unreleased_20/8925.rst [new file with mode: 0644]
doc/build/changelog/whatsnew_20.rst
lib/sqlalchemy/exc.py
lib/sqlalchemy/sql/base.py
lib/sqlalchemy/sql/schema.py
lib/sqlalchemy/testing/__init__.py
lib/sqlalchemy/testing/config.py
test/base/test_except.py
test/engine/test_reflection.py
test/orm/declarative/test_basic.py
test/sql/test_metadata.py

diff --git a/doc/build/changelog/unreleased_20/8925.rst b/doc/build/changelog/unreleased_20/8925.rst
new file mode 100644 (file)
index 0000000..23c7453
--- /dev/null
@@ -0,0 +1,17 @@
+.. change::
+    :tags: bug, schema
+    :tickets: 8925
+
+    Stricter rules are in place for appending of :class:`.Column` objects to
+    :class:`.Table` objects, both moving some previous deprecation warnings to
+    exceptions, and preventing some previous scenarios that would cause
+    duplicate columns to appear in tables, when
+    :paramref:`.Table.extend_existing` were set to ``True``, for both
+    programmatic :class:`.Table` construction as well as during reflection
+    operations.
+
+    See :ref:`change_8925` for a rundown of these changes.
+
+    .. seealso::
+
+        :ref:`change_8925`
index 97b4e93a0f71fe3b6345b839f2ef0690c2bf1a74..16c4115eb0eca1568866c6b92fc66711d6f0e1db 100644 (file)
@@ -1618,6 +1618,48 @@ as ``False``::
 
 :ticket:`8567`
 
+.. _change_8925:
+
+Stricter rules for replacement of Columns in Table objects with same-names, keys
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Stricter rules are in place for appending of :class:`.Column` objects to
+:class:`.Table` objects, both moving some previous deprecation warnings to
+exceptions, and preventing some previous scenarios that would cause
+duplicate columns to appear in tables, when
+:paramref:`.Table.extend_existing` were set to ``True``, for both
+programmatic :class:`.Table` construction as well as during reflection
+operations.
+
+* Under no circumstances should a :class:`.Table` object ever have two or more
+  :class:`.Column` objects with the same name, regardless of what .key they
+  have.  An edge case where this was still possible was identified and fixed.
+
+* Adding a :class:`.Column` to a :class:`.Table` that has the same name or
+  key as an existing :class:`.Column` will always raise
+  :class:`.DuplicateColumnError` (a new subclass of :class:`.ArgumentError` in
+  2.0.0b4) unless additional parameters are present;
+  :paramref:`.Table.append_column.replace_existing` for
+  :meth:`.Table.append_column`, and :paramref:`.Table.extend_existing` for
+  construction of a same-named :class:`.Table` as an existing one, with or
+  without reflection being used. Previously, there was a deprecation warning in
+  place for this scenario.
+
+* A warning is now emitted if a :class:`.Table` is created, that does
+  include :paramref:`.Table.extend_existing`, where an incoming
+  :class:`.Column` that has no separate :attr:`.Column.key` would fully
+  replace an existing :class:`.Column` that does have a key, which suggests
+  the operation is not what the user intended.  This can happen particularly
+  during a secondary reflection step, such as ``metadata.reflect(extend_existing=True)``.
+  The warning suggests that the :paramref:`.Table.autoload_replace` parameter
+  be set to ``False`` to prevent this. Previously, in 1.4 and earlier, the
+  incoming column would be added **in addition** to the existing column.
+  This was a bug and is a behavioral change in 2.0 (as of 2.0.0b4), as the
+  previous key will **no longer be present** in the column collection
+  when this occurs.
+
+
+:ticket:`8925`
 
 .. _change_7211:
 
index fa46a46c4b71b2006d6d3eb8b533b202f207d5b5..c1f1a9c1c98d8e40af29adbeff8c639ef30d281a 100644 (file)
@@ -125,6 +125,15 @@ class ArgumentError(SQLAlchemyError):
     """
 
 
+class DuplicateColumnError(ArgumentError):
+    """a Column is being added to a Table that would replace another
+    Column, without appropriate parameters to allow this in place.
+
+    .. versionadded:: 2.0.0b4
+
+    """
+
+
 class ObjectNotExecutableError(ArgumentError):
     """Raised when an object is passed to .execute() that can't be
     executed as SQL.
index fc80334e874582606c7d1627657e02663d0bc41e..0b96e5bbf2dc00bfcf2d69fe7b841cdc0b3811f6 100644 (file)
@@ -1969,7 +1969,11 @@ class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
         # delete higher index
         del self._index[len(self._collection)]
 
-    def replace(self, column: _NAMEDCOL) -> None:
+    def replace(
+        self,
+        column: _NAMEDCOL,
+        extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
+    ) -> None:
         """add the given column to this collection, removing unaliased
         versions of this column  as well as existing columns with the
         same key.
@@ -1986,7 +1990,10 @@ class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
 
         """
 
-        remove_col = set()
+        if extra_remove:
+            remove_col = set(extra_remove)
+        else:
+            remove_col = set()
         # remove up to two columns based on matches of name as well as key
         if column.name in self._index and column.key != column.name:
             other = self._index[column.name][1]
index 2d04b28a8710e5621d5d6290b45aef1ab2d5c5b7..cb28564d14dc5c3b6dfb9982cac630dcfdb80465 100644 (file)
@@ -872,6 +872,7 @@ class Table(
             allow_replacements=extend_existing
             or keep_existing
             or autoload_with,
+            all_names={},
         )
 
     def _autoload(
@@ -936,9 +937,13 @@ class Table(
         schema = kwargs.pop("schema", None)
         _extend_on = kwargs.pop("_extend_on", None)
         _reflect_info = kwargs.pop("_reflect_info", None)
+
         # these arguments are only used with _init()
-        kwargs.pop("extend_existing", False)
-        kwargs.pop("keep_existing", False)
+        extend_existing = kwargs.pop("extend_existing", False)
+        keep_existing = kwargs.pop("keep_existing", False)
+
+        assert extend_existing
+        assert not keep_existing
 
         if schema and schema != self.schema:
             raise exc.ArgumentError(
@@ -987,8 +992,9 @@ class Table(
                 _reflect_info=_reflect_info,
             )
 
+        all_names = {c.name: c for c in self.c}
         self._extra_kwargs(**kwargs)
-        self._init_items(*args)
+        self._init_items(*args, allow_replacements=True, all_names=all_names)
 
     def _extra_kwargs(self, **kwargs: Any) -> None:
         self._validate_dialect_kwargs(kwargs)
@@ -1070,9 +1076,18 @@ class Table(
             .. versionadded:: 1.4.0
         """
 
-        column._set_parent_with_dispatch(
-            self, allow_replacements=replace_existing
-        )
+        try:
+            column._set_parent_with_dispatch(
+                self,
+                allow_replacements=replace_existing,
+                all_names={c.name: c for c in self.c},
+            )
+        except exc.DuplicateColumnError as de:
+            raise exc.DuplicateColumnError(
+                f"{de.args[0]} Specify replace_existing=True to "
+                "Table.append_column() to replace an "
+                "existing column."
+            ) from de
 
     def append_constraint(self, constraint: Union[Index, Constraint]) -> None:
         """Append a :class:`_schema.Constraint` to this
@@ -2099,10 +2114,12 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
             + ["%s=%s" % (k, repr(getattr(self, k))) for k in kwarg]
         )
 
-    def _set_parent(
+    def _set_parent(  # type: ignore[override]
         self,
         parent: SchemaEventTarget,
-        allow_replacements: bool = True,
+        *,
+        all_names: Dict[str, Column[Any]],
+        allow_replacements: bool,
         **kw: Any,
     ) -> None:
         table = parent
@@ -2125,19 +2142,32 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
                 % (self.key, existing.description)
             )
 
+        extra_remove = None
+        existing_col = None
+        conflicts_on = ""
+
         if self.key in table._columns:
-            col = table._columns[self.key]
-            if col is not self:
+            existing_col = table._columns[self.key]
+            if self.key == self.name:
+                conflicts_on = "name"
+            else:
+                conflicts_on = "key"
+        elif self.name in all_names:
+            existing_col = all_names[self.name]
+            extra_remove = {existing_col}
+            conflicts_on = "name"
+
+        if existing_col is not None:
+            if existing_col is not self:
                 if not allow_replacements:
-                    util.warn_deprecated(
-                        "A column with name '%s' is already present "
-                        "in table '%s'. Please use method "
-                        ":meth:`_schema.Table.append_column` with the "
-                        "parameter ``replace_existing=True`` to replace an "
-                        "existing column." % (self.key, table.name),
-                        "1.4",
+                    raise exc.DuplicateColumnError(
+                        f"A column with {conflicts_on} "
+                        f"""'{
+                            self.key if conflicts_on == 'key' else self.name
+                        }' """
+                        f"is already present in table '{table.name}'."
                     )
-                for fk in col.foreign_keys:
+                for fk in existing_col.foreign_keys:
                     table.foreign_keys.remove(fk)
                     if fk.constraint in table.constraints:
                         # this might have been removed
@@ -2145,8 +2175,17 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
                         # and more than one col being replaced
                         table.constraints.remove(fk.constraint)
 
-        table._columns.replace(self)
-
+        if extra_remove and existing_col is not None and self.key == self.name:
+            util.warn(
+                f'Column with user-specified key "{existing_col.key}" is '
+                "being replaced with "
+                f'plain named column "{self.name}", '
+                f'key "{existing_col.key}" is being removed.  If this is a '
+                "reflection operation, specify autoload_replace=False to "
+                "prevent this replacement."
+            )
+        table._columns.replace(self, extra_remove=extra_remove)
+        all_names[self.name] = self
         self.table = table
 
         if self.primary_key:
index 3a028f0027e1acde53b68d8b0f367e8e1cf054fa..993fc4954fe33c2a1a9fb670223a890606689972 100644 (file)
@@ -54,6 +54,7 @@ from .config import db
 from .config import fixture
 from .config import requirements as requires
 from .config import skip_test
+from .config import Variation
 from .config import variation
 from .exclusions import _is_excluded
 from .exclusions import _server_version
index a75c36776484d35f40276e6fd8d3078cc7da0015..9578765798402a7b414b8ddba64d0b21b5486a37 100644 (file)
@@ -14,11 +14,13 @@ import typing
 from typing import Any
 from typing import Callable
 from typing import Iterable
+from typing import NoReturn
 from typing import Optional
 from typing import Tuple
 from typing import TypeVar
 from typing import Union
 
+from .util import fail
 from .. import util
 
 requirements = None
@@ -128,21 +130,36 @@ def combinations_list(
     return combinations(*arg_iterable, **kw)
 
 
-class _variation_base:
-    __slots__ = ("name", "argname")
+class Variation:
+    __slots__ = ("_name", "_argname")
 
     def __init__(self, case, argname, case_names):
-        self.name = case
-        self.argname = argname
+        self._name = case
+        self._argname = argname
         for casename in case_names:
             setattr(self, casename, casename == case)
 
+    if typing.TYPE_CHECKING:
+
+        def __getattr__(self, key: str) -> bool:
+            ...
+
+    @property
+    def name(self):
+        return self._name
+
     def __bool__(self):
-        return self.name == self.argname
+        return self._name == self._argname
 
     def __nonzero__(self):
         return not self.__bool__()
 
+    def __str__(self):
+        return f"{self._argname}={self._name!r}"
+
+    def fail(self) -> NoReturn:
+        fail(f"Unknown {self}")
+
 
 def variation(argname, cases):
     """a helper around testing.combinations that provides a single namespace
@@ -193,7 +210,7 @@ def variation(argname, cases):
 
     typ = type(
         argname,
-        (_variation_base,),
+        (Variation,),
         {
             "__slots__": tuple(case_names),
         },
index a458afb97c5e977387e3e59168e779335b29d6f6..2bf7d1eb8cf18d7a8bc68531429e67216abdfbf3 100644 (file)
@@ -428,6 +428,7 @@ ALL_EXC = [
     (
         [
             sa_exceptions.ArgumentError,
+            sa_exceptions.DuplicateColumnError,
             sa_exceptions.NoSuchModuleError,
             sa_exceptions.NoForeignKeysError,
             sa_exceptions.AmbiguousForeignKeysError,
index 81b85df08d6a95313bec95ce773e2707db88d630..1678e8143956cec8e6c5c15c74416c256210ad32 100644 (file)
@@ -257,7 +257,8 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
         )
         assert "nonexistent" not in meta.tables
 
-    def test_extend_existing(self, connection, metadata):
+    @testing.variation("use_metadata_reflect", [True, False])
+    def test_extend_existing(self, connection, metadata, use_metadata_reflect):
         meta = metadata
 
         Table(
@@ -276,6 +277,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
         old_q = Column("q", Integer)
         t2 = Table("t", m2, old_z, old_q)
         eq_(list(t2.primary_key.columns), [t2.c.z])
+
         t2 = Table(
             "t",
             m2,
@@ -283,6 +285,8 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
             extend_existing=True,
             autoload_with=connection,
         )
+        if use_metadata_reflect:
+            m2.reflect(connection, extend_existing=True)
         eq_(set(t2.columns.keys()), {"x", "y", "z", "q", "id"})
 
         # this has been the actual behavior, the cols are added together,
@@ -290,7 +294,8 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
         eq_(list(t2.primary_key.columns), [t2.c.z, t2.c.id])
 
         assert t2.c.z is not old_z
-        assert t2.c.y is old_y
+        if not use_metadata_reflect:
+            assert t2.c.y is old_y
         assert t2.c.z.type._type_affinity is Integer
         assert t2.c.q is old_q
 
@@ -302,6 +307,8 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
             extend_existing=False,
             autoload_with=connection,
         )
+        if use_metadata_reflect:
+            m3.reflect(connection, extend_existing=False)
         eq_(set(t3.columns.keys()), {"z"})
 
         m4 = MetaData()
@@ -318,6 +325,10 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
             autoload_replace=False,
             autoload_with=connection,
         )
+        if use_metadata_reflect:
+            m4.reflect(
+                connection, extend_existing=True, autoload_replace=False
+            )
         eq_(set(t4.columns.keys()), {"x", "y", "z", "q", "id"})
         eq_(list(t4.primary_key.columns), [t4.c.z, t4.c.id])
         assert t4.c.z is old_z
@@ -325,6 +336,86 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
         assert t4.c.z.type._type_affinity is String
         assert t4.c.q is old_q
 
+    @testing.variation(
+        "extend_type",
+        [
+            "autoload",
+            "metadata_reflect",
+            "metadata_reflect_no_replace",
+            "plain_table",
+        ],
+    )
+    def test_extend_existing_never_dupe_column(
+        self, connection, metadata, extend_type
+    ):
+        """test #8925"""
+        meta = metadata
+
+        Table(
+            "t",
+            meta,
+            Column("id", Integer, primary_key=True),
+            Column("x", Integer),
+            Column("y", Integer),
+        )
+        meta.create_all(connection)
+
+        m2 = MetaData()
+        if extend_type.metadata_reflect:
+            t2 = Table(
+                "t",
+                m2,
+                Column("id", Integer, primary_key=True),
+                Column("x", Integer, key="x2"),
+            )
+            with expect_warnings(
+                'Column with user-specified key "x2" is being replaced '
+                'with plain named column "x", key "x2" is being removed.'
+            ):
+                m2.reflect(connection, extend_existing=True)
+            eq_(set(t2.columns.keys()), {"x", "y", "id"})
+        elif extend_type.metadata_reflect_no_replace:
+            t2 = Table(
+                "t",
+                m2,
+                Column("id", Integer, primary_key=True),
+                Column("x", Integer, key="x2"),
+            )
+            m2.reflect(
+                connection, extend_existing=True, autoload_replace=False
+            )
+            eq_(set(t2.columns.keys()), {"x2", "y", "id"})
+        elif extend_type.autoload:
+            t2 = Table(
+                "t",
+                m2,
+                Column("id", Integer, primary_key=True),
+                Column("x", Integer, key="x2"),
+                autoload_with=connection,
+                extend_existing=True,
+            )
+            eq_(set(t2.columns.keys()), {"x2", "y", "id"})
+        elif extend_type.plain_table:
+            Table(
+                "t",
+                m2,
+                Column("id", Integer, primary_key=True),
+                Column("x", Integer, key="x2"),
+            )
+            with expect_warnings(
+                'Column with user-specified key "x2" is being replaced with '
+                'plain named column "x", key "x2" is being removed.'
+            ):
+                t2 = Table(
+                    "t",
+                    m2,
+                    Column("id", Integer, primary_key=True),
+                    Column("x", Integer),
+                    Column("y", Integer),
+                    extend_existing=True,
+                )
+            eq_(set(t2.columns.keys()), {"x", "y", "id"})
+
     def test_extend_existing_reflect_all_dont_dupe_index(
         self, connection, metadata
     ):
index 475b5e39bb80e3e653df85d136aeeedc664e1bb5..ead6bcc4eda5a1d72347c6772b09c73d76e925f0 100644 (file)
@@ -46,6 +46,7 @@ from sqlalchemy.testing import assert_raises
 from sqlalchemy.testing import assert_raises_message
 from sqlalchemy.testing import assertions
 from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises
 from sqlalchemy.testing import expect_raises_message
 from sqlalchemy.testing import expect_warnings
 from sqlalchemy.testing import fixtures
@@ -936,12 +937,11 @@ class DeclarativeMultiBaseTest(
                 id = Column(Integer, primary_key=True)
 
     def test_column_named_twice(self):
-        with assertions.expect_deprecated(
-            "A column with name 'x' is already present in table 'foo'"
-        ), expect_warnings(
-            "On class 'Foo', Column object 'x' named directly multiple times, "
-            "only one will be used: x, y",
-        ):
+        with expect_warnings(
+            "On class 'Foo', Column object 'x' named directly multiple "
+            "times, only one will be used: x, y. Consider using "
+            "orm.synonym instead"
+        ), expect_raises(exc.DuplicateColumnError):
 
             class Foo(Base):
                 __tablename__ = "foo"
@@ -951,12 +951,11 @@ class DeclarativeMultiBaseTest(
                 y = Column("x", Integer)
 
     def test_column_repeated_under_prop(self):
-        with assertions.expect_deprecated(
-            "A column with name 'x' is already present in table 'foo'"
-        ), expect_warnings(
-            "On class 'Foo', Column object 'x' named directly multiple times, "
-            "only one will be used: x, y, z",
-        ):
+        with expect_warnings(
+            "On class 'Foo', Column object 'x' named directly multiple "
+            "times, only one will be used: x, y, z. Consider using "
+            "orm.synonym instead"
+        ), expect_raises(exc.DuplicateColumnError):
 
             class Foo(Base):
                 __tablename__ = "foo"
index e50aa236a9487410c3dd6ee881b67723b6bbc7ea..56bb89541eb63351a1c310ba9f7694f50d8cee35 100644 (file)
@@ -59,6 +59,7 @@ from sqlalchemy.testing import is_
 from sqlalchemy.testing import is_false
 from sqlalchemy.testing import is_true
 from sqlalchemy.testing import mock
+from sqlalchemy.testing import Variation
 from sqlalchemy.testing.assertions import expect_warnings
 
 
@@ -1825,8 +1826,12 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL):
         is_(t2.c.contains_column(g), False)
 
     def test_table_ctor_duplicated_column_name(self):
-        def go():
-            return Table(
+        # when it will raise
+        with testing.expect_raises_message(
+            exc.ArgumentError,
+            "A column with name 'col' is already present in table 't'",
+        ):
+            Table(
                 "t",
                 MetaData(),
                 Column("a", Integer),
@@ -1834,39 +1839,42 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL):
                 Column("col", String),
             )
 
-        with testing.expect_deprecated(
-            "A column with name 'col' is already present in table 't'",
-        ):
-            t = go()
-        is_true(isinstance(t.c.col.type, String))
-        # when it will raise
-        # with testing.expect_raises_message(
-        #     exc.ArgumentError,
-        #     "A column with name 'col' is already present in table 't'",
-        # ):
-        #     go()
-
     def test_append_column_existing_name(self):
         t = Table("t", MetaData(), Column("col", Integer))
 
-        with testing.expect_deprecated(
-            "A column with name 'col' is already present in table 't'",
+        with testing.expect_raises_message(
+            exc.DuplicateColumnError,
+            r"A column with name 'col' is already present in table 't'. "
+            r"Specify replace_existing=True to Table.append_column\(\) to "
+            r"replace an existing column.",
         ):
             t.append_column(Column("col", String))
-        is_true(isinstance(t.c.col.type, String))
-        # when it will raise
-        # col = t.c.col
-        # with testing.expect_raises_message(
-        #     exc.ArgumentError,
-        #     "A column with name 'col' is already present in table 't'",
-        # ):
-        #     t.append_column(Column("col", String))
-        # is_true(t.c.col is col)
-
-    def test_append_column_replace_existing(self):
-        t = Table("t", MetaData(), Column("col", Integer))
-        t.append_column(Column("col", String), replace_existing=True)
-        is_true(isinstance(t.c.col.type, String))
+
+    def test_append_column_existing_key(self):
+        t = Table("t", MetaData(), Column("col", Integer, key="c2"))
+
+        with testing.expect_raises_message(
+            exc.DuplicateColumnError,
+            r"A column with key 'c2' is already present in table 't'. "
+            r"Specify replace_existing=True to Table.append_column\(\) "
+            r"to replace an existing column.",
+        ):
+            t.append_column(Column("col", String, key="c2"))
+
+    @testing.variation("field", ["name", "key"])
+    def test_append_column_replace_existing(self, field: Variation):
+        if field.name:
+            t = Table("t", MetaData(), Column("col", Integer))
+            t.append_column(Column("col", String), replace_existing=True)
+            is_true(isinstance(t.c.col.type, String))
+        elif field.key:
+            t = Table("t", MetaData(), Column("col", Integer, key="c2"))
+            t.append_column(
+                Column("col", String, key="c2"), replace_existing=True
+            )
+            is_true(isinstance(t.c.c2.type, String))
+        else:
+            field.fail()
 
     def test_autoincrement_replace(self):
         m = MetaData()
@@ -2658,7 +2666,7 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL):
             assert t2.index("CREATE TABLE someschema.table2") > -1
 
 
-class UseExistingTest(fixtures.TablesTest):
+class UseExistingTest(testing.AssertsCompiledSQL, fixtures.TablesTest):
     @classmethod
     def define_tables(cls, metadata):
         Table(
@@ -2678,6 +2686,121 @@ class UseExistingTest(fixtures.TablesTest):
     def empty_meta(self):
         return MetaData()
 
+    @testing.variation(
+        "scenario",
+        [
+            "inplace",
+            "inplace_ee",
+            "separate_ee_key_first",
+            "separate_ee_key_second",
+            "separate_ee_key_append_no_replace",
+            "separate_ee_key_append_replace",
+        ],
+    )
+    @testing.variation("both_have_keys", [True, False])
+    def test_table_w_two_same_named_columns(
+        self, empty_meta, scenario: Variation, both_have_keys: Variation
+    ):
+
+        if scenario.inplace:
+            with expect_raises_message(
+                exc.DuplicateColumnError,
+                "A column with name 'b' is already present in table 'users'.",
+            ):
+                t1 = Table(
+                    "users",
+                    empty_meta,
+                    Column("a", String),
+                    Column("b", String, key="b1" if both_have_keys else None),
+                    Column("b", String, key="b2"),
+                )
+            return
+        elif scenario.inplace_ee:
+            t1 = Table(
+                "users",
+                empty_meta,
+                Column("a", String),
+                Column("b", String, key="b1" if both_have_keys else None),
+                Column("b", String, key="b2"),
+                extend_existing=True,
+            )
+        elif scenario.separate_ee_key_first:
+            t1 = Table(
+                "users",
+                empty_meta,
+                Column("a", String),
+                Column("b", String, key="b2"),
+            )
+
+            expected_warnings = (
+                [
+                    'Column with user-specified key "b2" is being '
+                    'replaced with plain named column "b", key "b2" '
+                    "is being removed."
+                ]
+                if not both_have_keys
+                else []
+            )
+            with expect_warnings(*expected_warnings):
+                t1 = Table(
+                    "users",
+                    empty_meta,
+                    Column("a", String),
+                    Column("b", String, key="b1" if both_have_keys else None),
+                    extend_existing=True,
+                )
+        elif scenario.separate_ee_key_second:
+            t1 = Table(
+                "users",
+                empty_meta,
+                Column("a", String),
+                Column("b", String, key="b1" if both_have_keys else None),
+            )
+            t1 = Table(
+                "users",
+                empty_meta,
+                Column("a", String),
+                Column("b", String, key="b2"),
+                extend_existing=True,
+            )
+        elif scenario.separate_ee_key_append_no_replace:
+            t1 = Table(
+                "users",
+                empty_meta,
+                Column("a", String),
+                Column("b", String, key="b1" if both_have_keys else None),
+            )
+            with expect_raises_message(
+                exc.DuplicateColumnError,
+                r"A column with name 'b' is already present in table 'users'. "
+                r"Specify replace_existing=True to Table.append_column\(\) "
+                r"to replace an existing column.",
+            ):
+                t1.append_column(Column("b", String, key="b2"))
+            return
+        elif scenario.separate_ee_key_append_replace:
+            t1 = Table(
+                "users",
+                empty_meta,
+                Column("a", String),
+                Column("b", String, key="b1" if both_have_keys else None),
+            )
+            t1.append_column(
+                Column("b", String, key="b2"), replace_existing=True
+            )
+
+        else:
+            scenario.fail()
+
+        if scenario.separate_ee_key_first:
+            if both_have_keys:
+                eq_(t1.c.keys(), ["a", "b1"])
+            else:
+                eq_(t1.c.keys(), ["a", "b"])
+        else:
+            eq_(t1.c.keys(), ["a", "b2"])
+        self.assert_compile(select(t1), "SELECT users.a, users.b FROM users")
+
     def test_exception_no_flags(self, existing_meta):
         def go():
             Table(