--- /dev/null
+.. change::
+ :tags: change, sql
+ :tickets: 5526
+
+ The :class:`_schema.Table` class now raises a deprecation warning
+ when columns with the same name are defined. To replace a column a new
+ parameter :paramref:`_schema.Table.append_column.replace_existing` was
+ added to the :meth:`_schema.Table.append_column` method.
+
+ The :meth:`_expression.ColumnCollection.contains_column` will now
+ raises an error when called with a string, suggesting the caller
+ to use ``in`` instead.
if not super_history_mapper:
local_mapper.local_table.append_column(
- Column("version", Integer, default=1, nullable=False)
+ Column("version", Integer, default=1, nullable=False),
+ replace_existing=True,
)
local_mapper.add_property(
"version", local_mapper.local_table.c.version
if where is not None:
self.where = coercions.expect(roles.StatementOptionRole, where)
- def _set_parent(self, table):
+ def _set_parent(self, table, **kw):
super(ExcludeConstraint, self)._set_parent(table)
self._render_exprs = [
if col.key in table.primary_key:
col.primary_key = True
- table.append_column(col)
+ table.append_column(col, replace_existing=True)
def _reflect_col_sequence(self, col_d, colargs):
if "sequence" in col_d:
if "__mapper__" in cls.__dict__:
if isinstance(value, Column):
_undefer_column_name(key, value)
- cls.__table__.append_column(value)
+ cls.__table__.append_column(value, replace_existing=True)
cls.__mapper__.add_property(key, value)
elif isinstance(value, ColumnProperty):
for col in value.columns:
if isinstance(col, Column) and col.table is None:
_undefer_column_name(key, col)
- cls.__table__.append_column(col)
+ cls.__table__.append_column(col, replace_existing=True)
cls.__mapper__.add_property(key, value)
elif isinstance(value, MapperProperty):
cls.__mapper__.add_property(key, value)
"""
- def _set_parent(self, parent):
+ def _set_parent(self, parent, **kw):
"""Associate with this SchemaEvent's parent object."""
- def _set_parent_with_dispatch(self, parent):
+ def _set_parent_with_dispatch(self, parent, **kw):
self.dispatch.before_parent_attach(self, parent)
- self._set_parent(parent)
+ self._set_parent(parent, **kw)
self.dispatch.after_parent_attach(self, parent)
)
def contains_column(self, col):
- return col in self._colset
+ """Checks if a column object exists in this collection"""
+ if col not in self._colset:
+ if isinstance(col, util.string_types):
+ raise exc.ArgumentError(
+ "contains_column cannot be used with string arguments. "
+ "Use ``col_name in table.c`` instead."
+ )
+ return False
+ else:
+ return True
def as_immutable(self):
return ImmutableColumnCollection(self)
"""
def add(self, column, key=None):
+
if key is not None and column.key != key:
raise exc.ArgumentError(
"DedupeColumnCollection requires columns be under "
__visit_name__ = "schema_item"
- def _init_items(self, *args):
+ def _init_items(self, *args, **kw):
"""Initialize the list of child items for this SchemaItem."""
-
for item in args:
if item is not None:
try:
replace_context=err,
)
else:
- spwd(self)
+ spwd(self, **kw)
def __repr__(self):
return util.generic_repr(self, omit_kwarg=["info"])
:class:`.PrimaryKeyConstraint`, and
:class:`_schema.ForeignKeyConstraint`.
- :param autoload: Defaults to False, unless
+ :param autoload: Defaults to ``False``, unless
:paramref:`_schema.Table.autoload_with`
- is set in which case it defaults to True; :class:`_schema.Column`
- objects
+ is set in which case it defaults to ``True``;
+ :class:`_schema.Column` objects
for this table should be reflected from the database, possibly
- augmenting or replacing existing :class:`_schema.Column`
- objects that were
- explicitly specified.
+ augmenting objects that were explicitly specified.
+ :class:`_schema.Column` and other objects explicitly set on the
+ table will replace corresponding reflected objects.
.. deprecated:: 1.4
schema = metadata.schema
elif schema is BLANK_SCHEMA:
schema = None
- keep_existing = kw.pop("keep_existing", False)
- extend_existing = kw.pop("extend_existing", False)
+ keep_existing = kw.get("keep_existing", False)
+ extend_existing = kw.get("extend_existing", False)
if keep_existing and extend_existing:
msg = "keep_existing and extend_existing are mutually exclusive."
table._init(name, metadata, *args, **kw)
table.dispatch.after_parent_attach(table, metadata)
return table
- except:
+ except Exception:
with util.safe_reraise():
metadata._remove_table(name, schema)
self.indexes = set()
self.constraints = set()
- self._columns = DedupeColumnCollection()
PrimaryKeyConstraint(
_implicit_generated=True
)._set_parent_with_dispatch(self)
autoload = kwargs.pop("autoload", autoload_with is not None)
# this argument is only used with _init_existing()
kwargs.pop("autoload_replace", True)
+ keep_existing = kwargs.pop("keep_existing", False)
+ extend_existing = kwargs.pop("extend_existing", False)
_extend_on = kwargs.pop("_extend_on", None)
resolve_fks = kwargs.pop("resolve_fks", True)
# initialize all the column, etc. objects. done after reflection to
# allow user-overrides
- self._init_items(*args)
+
+ self._init_items(
+ *args,
+ allow_replacements=extend_existing or keep_existing or autoload
+ )
def _autoload(
self,
autoload_replace = kwargs.pop("autoload_replace", True)
schema = kwargs.pop("schema", None)
_extend_on = kwargs.pop("_extend_on", None)
+ # these arguments are only used with _init()
+ kwargs.pop("extend_existing", False)
+ kwargs.pop("keep_existing", False)
if schema and schema != self.schema:
raise exc.ArgumentError(
"""
self._extra_dependencies.add(table)
- def append_column(self, column):
+ def append_column(self, column, replace_existing=False):
"""Append a :class:`_schema.Column` to this :class:`_schema.Table`.
The "key" of the newly added :class:`_schema.Column`, i.e. the
emitted for an already-existing table that doesn't contain
the newly added column.
+ :param replace_existing: When ``True``, allows replacing existing
+ columns. When ``False``, the default, an warning will be raised
+ if a column with the same ``.key`` already exists. A future
+ version of sqlalchemy will instead rise a warning.
+
+ .. versionadded:: 1.4.0
"""
- column._set_parent_with_dispatch(self)
+ column._set_parent_with_dispatch(
+ self, allow_replacements=replace_existing
+ )
def append_constraint(self, constraint):
"""Append a :class:`_schema.Constraint` to this
constraint._set_parent_with_dispatch(self)
- def _set_parent(self, metadata):
+ def _set_parent(self, metadata, **kw):
metadata._add_table(self.name, self.schema, self)
self.metadata = metadata
+ ["%s=%s" % (k, repr(getattr(self, k))) for k in kwarg]
)
- def _set_parent(self, table):
+ def _set_parent(self, table, allow_replacements=True):
if not self.name:
raise exc.ArgumentError(
"Column must be constructed with a non-blank name or "
if self.key in table._columns:
col = table._columns.get(self.key)
if col is not self:
+ if not allow_replacements:
+ util.warn_deprecated(
+ "A column with name '%s' is already present "
+ "in table '%s'. Please use method "
+ ":meth:`_schema.Table.append_column` with the "
+ "parameter ``replace_existing=True`` to replace an "
+ "existing column." % (self.key, table.name),
+ "1.4",
+ )
for fk in col.foreign_keys:
table.foreign_keys.remove(fk)
if fk.constraint in table.constraints:
_column = self._colspec
return _column
- def _set_parent(self, column):
+ def _set_parent(self, column, **kw):
if self.parent is not None and self.parent is not column:
raise exc.InvalidRequestError(
"This ForeignKey already has a parent !"
def __init__(self, for_update=False):
self.for_update = for_update
- def _set_parent(self, column):
+ def _set_parent(self, column, **kw):
self.column = column
if self.for_update:
self.column.onupdate = self
self, bind=self.bind
)
- def _set_parent(self, column):
+ def _set_parent(self, column, **kw):
super(Sequence, self)._set_parent(column)
column._on_table_attach(self._set_table)
n.for_update = for_update
return n
- def _set_parent(self, column):
+ def _set_parent(self, column, **kw):
self.column = column
if self.for_update:
self.column.server_onupdate = self
"mean to call table.append_constraint(constraint) ?"
)
- def _set_parent(self, parent):
+ def _set_parent(self, parent, **kw):
self.parent = parent
parent.constraints.add(self)
for col in self._pending_colargs
]
- def _set_parent(self, table):
+ def _set_parent(self, table, **kw):
for col in self._col_expressions(table):
if col is not None:
self.columns.add(col)
"""
- def _set_parent(self, table):
+ def _set_parent(self, table, **kw):
Constraint._set_parent(self, table)
ColumnCollectionMixin._set_parent(self, table)
def _col_description(self):
return ", ".join(self.column_keys)
- def _set_parent(self, table):
+ def _set_parent(self, table, **kw):
Constraint._set_parent(self, table)
try:
self._implicit_generated = kw.pop("_implicit_generated", False)
super(PrimaryKeyConstraint, self).__init__(*columns, **kw)
- def _set_parent(self, table):
+ def _set_parent(self, table, **kw):
super(PrimaryKeyConstraint, self)._set_parent(table)
if table.primary_key is not self:
if table is not None:
self._set_parent(table)
- def _set_parent(self, table):
+ def _set_parent(self, table, **kw):
ColumnCollectionMixin._set_parent(self, table)
if self.table is not None and table is not self.table:
self.persisted = persisted
self.column = None
- def _set_parent(self, parent):
+ def _set_parent(self, parent, **kw):
if not isinstance(
parent.server_default, (type(None), Computed)
) or not isinstance(parent.server_onupdate, (type(None), Computed)):
self.on_null = on_null
self.column = None
- def _set_parent(self, parent):
+ def _set_parent(self, parent, **kw):
if not isinstance(
parent.server_default, (type(None), Identity)
) or not isinstance(parent.server_onupdate, type(None)):
else:
return self.name.encode("ascii", "backslashreplace")
- def append_column(self, c):
+ def append_column(self, c, **kw):
existing = c.table
if existing is not None and existing is not self:
raise exc.ArgumentError(
is_true(cc.contains_column(c1))
is_false(cc.contains_column(c3))
+ def test_contains_column_not_column(self):
+ c1, c2, c3 = sql.column("c1"), sql.column("c2"), sql.column("c3")
+ cc = self._column_collection(columns=[("c1", c1), ("c2", c2)])
+
+ is_false(cc.contains_column(c3 == 2))
+
+ with testing.expect_raises_message(
+ exc.ArgumentError,
+ "contains_column cannot be used with string arguments",
+ ):
+ cc.contains_column("c1")
+ with testing.expect_raises_message(
+ exc.ArgumentError,
+ "contains_column cannot be used with string arguments",
+ ):
+ cc.contains_column("foo")
+
def test_in(self):
col1 = sql.column("col1")
cc = self._column_collection(
),
Column("col3", Integer, Sequence("foo"), server_default="x"),
Column("col4", Integer, ColumnDefault("x"), DefaultClause("y")),
- Column(
- "col4",
- Integer,
- ColumnDefault("x"),
- DefaultClause("y"),
- DefaultClause("y", for_update=True),
- ),
Column(
"col5",
Integer,
onupdate="z",
),
)
+ tbl.append_column(
+ Column(
+ "col4",
+ Integer,
+ ColumnDefault("x"),
+ DefaultClause("y"),
+ DefaultClause("y", for_update=True),
+ ),
+ replace_existing=True,
+ )
has_(tbl, "col1", "default")
has_(tbl, "col2", "default", "server_default")
has_(tbl, "col3", "default", "server_default")
is_(t2.c.contains_column(z), True)
is_(t2.c.contains_column(g), False)
+ def test_table_ctor_duplicated_column_name(self):
+ def go():
+ return Table(
+ "t",
+ MetaData(),
+ Column("a", Integer),
+ Column("col", Integer),
+ Column("col", String),
+ )
+
+ with testing.expect_deprecated(
+ "A column with name 'col' is already present in table 't'",
+ ):
+ t = go()
+ is_true(isinstance(t.c.col.type, String))
+ # when it will raise
+ # with testing.expect_raises_message(
+ # exc.ArgumentError,
+ # "A column with name 'col' is already present in table 't'",
+ # ):
+ # go()
+
+ def test_append_column_existing_name(self):
+ t = Table("t", MetaData(), Column("col", Integer))
+
+ with testing.expect_deprecated(
+ "A column with name 'col' is already present in table 't'",
+ ):
+ t.append_column(Column("col", String))
+ is_true(isinstance(t.c.col.type, String))
+ # when it will raise
+ # col = t.c.col
+ # with testing.expect_raises_message(
+ # exc.ArgumentError,
+ # "A column with name 'col' is already present in table 't'",
+ # ):
+ # t.append_column(Column("col", String))
+ # is_true(t.c.col is col)
+
+ def test_append_column_replace_existing(self):
+ t = Table("t", MetaData(), Column("col", Integer))
+ t.append_column(Column("col", String), replace_existing=True)
+ is_true(isinstance(t.c.col.type, String))
+
def test_autoincrement_replace(self):
m = MetaData()
Column("name", String(30)),
)
- def _useexisting_fixture(self):
+ @testing.fixture
+ def existing_meta(self):
meta2 = MetaData(testing.db)
Table("users", meta2, autoload=True)
return meta2
- def _notexisting_fixture(self):
+ @testing.fixture
+ def empty_meta(self):
return MetaData(testing.db)
- def test_exception_no_flags(self):
- meta2 = self._useexisting_fixture()
-
+ def test_exception_no_flags(self, existing_meta):
def go():
- Table("users", meta2, Column("name", Unicode), autoload=True)
+ Table(
+ "users", existing_meta, Column("name", Unicode), autoload=True
+ )
assert_raises_message(
exc.InvalidRequestError,
go,
)
- def test_keep_plus_existing_raises(self):
- meta2 = self._useexisting_fixture()
+ def test_keep_plus_existing_raises(self, existing_meta):
assert_raises(
exc.ArgumentError,
Table,
"users",
- meta2,
+ existing_meta,
keep_existing=True,
extend_existing=True,
)
- def test_keep_existing_no_dupe_constraints(self):
- meta2 = self._notexisting_fixture()
+ def test_keep_existing_no_dupe_constraints(self, empty_meta):
users = Table(
"users",
- meta2,
+ empty_meta,
Column("id", Integer),
Column("name", Unicode),
UniqueConstraint("name"),
u2 = Table(
"users",
- meta2,
+ empty_meta,
Column("id", Integer),
Column("name", Unicode),
UniqueConstraint("name"),
)
eq_(len(u2.constraints), 2)
- def test_extend_existing_dupes_constraints(self):
- meta2 = self._notexisting_fixture()
+ def test_extend_existing_dupes_constraints(self, empty_meta):
users = Table(
"users",
- meta2,
+ empty_meta,
Column("id", Integer),
Column("name", Unicode),
UniqueConstraint("name"),
u2 = Table(
"users",
- meta2,
+ empty_meta,
Column("id", Integer),
Column("name", Unicode),
UniqueConstraint("name"),
# constraint got duped
eq_(len(u2.constraints), 3)
- def test_keep_existing_coltype(self):
- meta2 = self._useexisting_fixture()
+ def test_autoload_replace_column(self, empty_meta):
+ users = Table(
+ "users", empty_meta, Column("name", Unicode), autoload=True
+ )
+ assert isinstance(users.c.name.type, Unicode)
+
+ def test_keep_existing_coltype(self, existing_meta):
users = Table(
"users",
- meta2,
+ existing_meta,
Column("name", Unicode),
autoload=True,
keep_existing=True,
)
assert not isinstance(users.c.name.type, Unicode)
- def test_keep_existing_quote(self):
- meta2 = self._useexisting_fixture()
+ def test_keep_existing_quote(self, existing_meta):
users = Table(
- "users", meta2, quote=True, autoload=True, keep_existing=True
+ "users",
+ existing_meta,
+ quote=True,
+ autoload=True,
+ keep_existing=True,
)
assert not users.name.quote
- def test_keep_existing_add_column(self):
- meta2 = self._useexisting_fixture()
+ def test_keep_existing_add_column(self, existing_meta):
users = Table(
"users",
- meta2,
+ existing_meta,
Column("foo", Integer),
autoload=True,
keep_existing=True,
)
assert "foo" not in users.c
- def test_keep_existing_coltype_no_orig(self):
- meta2 = self._notexisting_fixture()
+ def test_keep_existing_coltype_no_orig(self, empty_meta):
users = Table(
"users",
- meta2,
+ empty_meta,
Column("name", Unicode),
autoload=True,
keep_existing=True,
lambda: testing.db.dialect.requires_name_normalize,
"test depends on lowercase as case insensitive",
)
- def test_keep_existing_quote_no_orig(self):
- meta2 = self._notexisting_fixture()
+ def test_keep_existing_quote_no_orig(self, empty_meta):
users = Table(
- "users", meta2, quote=True, autoload=True, keep_existing=True
+ "users", empty_meta, quote=True, autoload=True, keep_existing=True
)
assert users.name.quote
- def test_keep_existing_add_column_no_orig(self):
- meta2 = self._notexisting_fixture()
+ def test_keep_existing_add_column_no_orig(self, empty_meta):
users = Table(
"users",
- meta2,
+ empty_meta,
Column("foo", Integer),
autoload=True,
keep_existing=True,
)
assert "foo" in users.c
- def test_keep_existing_coltype_no_reflection(self):
- meta2 = self._useexisting_fixture()
+ def test_keep_existing_coltype_no_reflection(self, existing_meta):
users = Table(
- "users", meta2, Column("name", Unicode), keep_existing=True
+ "users", existing_meta, Column("name", Unicode), keep_existing=True
)
assert not isinstance(users.c.name.type, Unicode)
- def test_keep_existing_quote_no_reflection(self):
- meta2 = self._useexisting_fixture()
- users = Table("users", meta2, quote=True, keep_existing=True)
+ def test_keep_existing_quote_no_reflection(self, existing_meta):
+ users = Table("users", existing_meta, quote=True, keep_existing=True)
assert not users.name.quote
- def test_keep_existing_add_column_no_reflection(self):
- meta2 = self._useexisting_fixture()
+ def test_keep_existing_add_column_no_reflection(self, existing_meta):
users = Table(
- "users", meta2, Column("foo", Integer), keep_existing=True
+ "users", existing_meta, Column("foo", Integer), keep_existing=True
)
assert "foo" not in users.c
- def test_extend_existing_coltype(self):
- meta2 = self._useexisting_fixture()
+ def test_extend_existing_coltype(self, existing_meta):
users = Table(
"users",
- meta2,
+ existing_meta,
Column("name", Unicode),
autoload=True,
extend_existing=True,
)
assert isinstance(users.c.name.type, Unicode)
- def test_extend_existing_quote(self):
- meta2 = self._useexisting_fixture()
+ def test_extend_existing_quote(self, existing_meta):
assert_raises_message(
tsa.exc.ArgumentError,
"Can't redefine 'quote' or 'quote_schema' arguments",
Table,
"users",
- meta2,
+ existing_meta,
quote=True,
autoload=True,
extend_existing=True,
)
- def test_extend_existing_add_column(self):
- meta2 = self._useexisting_fixture()
+ def test_extend_existing_add_column(self, existing_meta):
users = Table(
"users",
- meta2,
+ existing_meta,
Column("foo", Integer),
autoload=True,
extend_existing=True,
)
assert "foo" in users.c
- def test_extend_existing_coltype_no_orig(self):
- meta2 = self._notexisting_fixture()
+ def test_extend_existing_coltype_no_orig(self, empty_meta):
users = Table(
"users",
- meta2,
+ empty_meta,
Column("name", Unicode),
autoload=True,
extend_existing=True,
lambda: testing.db.dialect.requires_name_normalize,
"test depends on lowercase as case insensitive",
)
- def test_extend_existing_quote_no_orig(self):
- meta2 = self._notexisting_fixture()
+ def test_extend_existing_quote_no_orig(self, empty_meta):
users = Table(
- "users", meta2, quote=True, autoload=True, extend_existing=True
+ "users",
+ empty_meta,
+ quote=True,
+ autoload=True,
+ extend_existing=True,
)
assert users.name.quote
- def test_extend_existing_add_column_no_orig(self):
- meta2 = self._notexisting_fixture()
+ def test_extend_existing_add_column_no_orig(self, empty_meta):
users = Table(
"users",
- meta2,
+ empty_meta,
Column("foo", Integer),
autoload=True,
extend_existing=True,
)
assert "foo" in users.c
- def test_extend_existing_coltype_no_reflection(self):
- meta2 = self._useexisting_fixture()
+ def test_extend_existing_coltype_no_reflection(self, existing_meta):
users = Table(
- "users", meta2, Column("name", Unicode), extend_existing=True
+ "users",
+ existing_meta,
+ Column("name", Unicode),
+ extend_existing=True,
)
assert isinstance(users.c.name.type, Unicode)
- def test_extend_existing_quote_no_reflection(self):
- meta2 = self._useexisting_fixture()
+ def test_extend_existing_quote_no_reflection(self, existing_meta):
assert_raises_message(
tsa.exc.ArgumentError,
"Can't redefine 'quote' or 'quote_schema' arguments",
Table,
"users",
- meta2,
+ existing_meta,
quote=True,
extend_existing=True,
)
- def test_extend_existing_add_column_no_reflection(self):
- meta2 = self._useexisting_fixture()
+ def test_extend_existing_add_column_no_reflection(self, existing_meta):
users = Table(
- "users", meta2, Column("foo", Integer), extend_existing=True
+ "users",
+ existing_meta,
+ Column("foo", Integer),
+ extend_existing=True,
)
assert "foo" in users.c