--- /dev/null
+.. change::
+ :tags: bug, sql
+ :tickets: 5001
+
+ Fixed issue where when constructing constraints from ORM-bound columns,
+ primarily :class:`.ForeignKey` objects but also :class:`.UniqueConstraint`,
+ :class:`.CheckConstraint` and others, the ORM-level
+ :class:`.InstrumentedAttribute` is discarded entirely, and all ORM-level
+ annotations from the columns are removed; this is so that the constraints
+ are still fully pickleable without the ORM-level entities being pulled in.
+ These annotations are not necessary to be present at the schema/metadata
+ level.
raise exc.ArgumentError(msg, code=code)
+class _Deannotate(object):
+ def _post_coercion(self, resolved, **kw):
+ from .util import _deep_deannotate
+
+ return _deep_deannotate(resolved)
+
+
class _StringOnly(object):
def _resolve_for_clause_element(self, element, argname=None, **kw):
return self._literal_coercion(element, **kw)
return elements._truncated_label(element)
-class DDLExpressionImpl(_CoerceLiterals, RoleImpl, roles.DDLExpressionRole):
+class DDLExpressionImpl(
+ _Deannotate, _CoerceLiterals, RoleImpl, roles.DDLExpressionRole
+):
_coerce_consts = True
class DDLConstraintColumnImpl(
- _ReturnsStringKey, RoleImpl, roles.DDLConstraintColumnRole
+ _Deannotate, _ReturnsStringKey, RoleImpl, roles.DDLConstraintColumnRole
):
pass
+class DDLReferredColumnImpl(DDLConstraintColumnImpl):
+ pass
+
+
class LimitOffsetImpl(RoleImpl, roles.LimitOffsetRole):
def _implicit_coercions(self, element, resolved, argname=None, **kw):
if resolved is None:
# refer to BinaryExpression directly and pass strings
if isinstance(operator, util.string_types):
operator = operators.custom_op(operator)
- self._orig = (left, right)
+ self._orig = (hash(left), hash(right))
self.left = left.self_group(against=operator)
self.right = right.self_group(against=operator)
self.operator = operator
def __bool__(self):
if self.operator in (operator.eq, operator.ne):
- return self.operator(hash(self._orig[0]), hash(self._orig[1]))
+ return self.operator(self._orig[0], self._orig[1])
else:
raise TypeError("Boolean value of this clause is not defined")
return key, e
-class ColumnClause(roles.LabeledColumnExprRole, Immutable, ColumnElement):
+class ColumnClause(
+ roles.DDLReferredColumnRole,
+ roles.LabeledColumnExprRole,
+ Immutable,
+ ColumnElement,
+):
"""Represents a column expression from any textual string.
The :class:`.ColumnClause`, a lightweight analogue to the
class DDLConstraintColumnRole(SQLRole):
- _role_name = "String column name or column object for DDL constraint"
+ _role_name = "String column name or column expression for DDL constraint"
+
+
+class DDLReferredColumnRole(DDLConstraintColumnRole):
+ _role_name = (
+ "String column name or Column object for DDL foreign key constraint"
+ )
"""
- self._colspec = column
+ self._colspec = coercions.expect(roles.DDLReferredColumnRole, column)
+
if isinstance(self._colspec, util.string_types):
self._table_column = None
else:
- if hasattr(self._colspec, "__clause_element__"):
- self._table_column = self._colspec.__clause_element__()
- else:
- self._table_column = self._colspec
+ self._table_column = self._colspec
- if not isinstance(self._table_column, ColumnClause):
- raise exc.ArgumentError(
- "String, Column, or Column-bound argument "
- "expected, got %r" % self._table_column
- )
- elif not isinstance(
+ if not isinstance(
self._table_column.table, (util.NoneType, TableClause)
):
raise exc.ArgumentError(
raise NotImplementedError()
-def _to_schema_column(element):
- if hasattr(element, "__clause_element__"):
- element = element.__clause_element__()
- if not isinstance(element, Column):
- raise exc.ArgumentError("schema.Column object expected")
- return element
-
-
-def _to_schema_column_or_string(element):
- if element is None:
- return element
- elif hasattr(element, "__clause_element__"):
- element = element.__clause_element__()
- if not isinstance(element, util.string_types + (ColumnElement,)):
- msg = "Element %r is not a string name or column element"
- raise exc.ArgumentError(msg % element)
- return element
-
-
class ColumnCollectionMixin(object):
columns = None
_autoattach = kw.pop("_autoattach", True)
self._column_flag = kw.pop("_column_flag", False)
self.columns = DedupeColumnCollection()
- self._pending_colargs = [
- _to_schema_column_or_string(c) for c in columns
- ]
+
+ processed_expressions = kw.pop("_gather_expressions", None)
+ if processed_expressions is not None:
+ self._pending_colargs = []
+ for (
+ expr,
+ column,
+ strname,
+ add_element,
+ ) in coercions.expect_col_expression_collection(
+ roles.DDLConstraintColumnRole, columns
+ ):
+ self._pending_colargs.append(add_element)
+ processed_expressions.append(expr)
+ else:
+ self._pending_colargs = [
+ coercions.expect(roles.DDLConstraintColumnRole, column)
+ for column in columns
+ ]
+
if _autoattach and self._pending_colargs:
self._check_attach()
"""
self.sqltext = coercions.expect(roles.DDLExpressionRole, sqltext)
-
columns = []
visitors.traverse(self.sqltext, {}, {"column": columns.append})
"""
self.table = table = None
- columns = []
- processed_expressions = []
- for (
- expr,
- column,
- strname,
- add_element,
- ) in coercions.expect_col_expression_collection(
- roles.DDLConstraintColumnRole, expressions
- ):
- columns.append(add_element)
- processed_expressions.append(expr)
-
- self.expressions = processed_expressions
self.name = quoted_name(name, kw.pop("quote", None))
self.unique = kw.pop("unique", False)
_column_flag = kw.pop("_column_flag", False)
self._validate_dialect_kwargs(kw)
+ self.expressions = []
# will call _set_parent() if table-bound column
# objects are present
ColumnCollectionMixin.__init__(
- self, *columns, _column_flag=_column_flag
+ self,
+ *expressions,
+ _column_flag=_column_flag,
+ _gather_expressions=self.expressions
)
if table is not None:
from sqlalchemy.schema import CreateIndex
from sqlalchemy.schema import DropIndex
from sqlalchemy.sql import naming
+from sqlalchemy.sql import operators
from sqlalchemy.sql.elements import _NONE_NAME
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
not_a_col = bindparam("x")
assert_raises_message(
exc.ArgumentError,
- "String, Column, or Column-bound argument expected, got Bind",
+ "String column name or Column object for DDL foreign "
+ "key constraint expected, got .*Bind",
ForeignKey,
not_a_col,
)
assert_raises_message(
exc.ArgumentError,
- "String, Column, or Column-bound argument expected, got Bind",
+ "String column name or Column object for DDL foreign "
+ "key constraint expected, got .*Foo",
ForeignKey,
Foo(),
)
idx = Index("bar", MyThing(), t.c.y)
- is_(idx.expressions[0], expr1)
+ is_true(idx.expressions[0].compare(expr1))
is_(idx.expressions[1], t.c.y)
def test_table_references(self):
assert_raises_message(
exc.ArgumentError,
- r"String column name or column object for DDL constraint "
+ r"String column name or column expression for DDL constraint "
r"expected, got .*SomeClass",
Index,
"foo",
SomeClass(),
)
+ @testing.fixture
+ def no_pickle_annotated(self):
+ class NoPickle(object):
+ def __reduce__(self):
+ raise NotImplementedError()
+
+ class ClauseElement(operators.ColumnOperators):
+ def __init__(self, col):
+ self.col = col._annotate({"bar": NoPickle()})
+
+ def __clause_element__(self):
+ return self.col
+
+ def operate(self, op, *other, **kwargs):
+ return self.col.operate(op, *other, **kwargs)
+
+ m = MetaData()
+ t = Table("t", m, Column("q", Integer))
+ return t, ClauseElement(t.c.q)
+
+ def test_pickle_fk_annotated_col(self, no_pickle_annotated):
+
+ t, q_col = no_pickle_annotated
+
+ t2 = Table("t2", t.metadata, Column("p", ForeignKey(q_col)))
+ assert t2.c.p.references(t.c.q)
+
+ m2 = pickle.loads(pickle.dumps(t.metadata))
+
+ m2_t, m2_t2 = m2.tables["t"], m2.tables["t2"]
+
+ is_true(m2_t2.c.p.references(m2_t.c.q))
+
+ def test_pickle_uq_annotated_col(self, no_pickle_annotated):
+ t, q_col = no_pickle_annotated
+
+ t.append_constraint(UniqueConstraint(q_col))
+
+ m2 = pickle.loads(pickle.dumps(t.metadata))
+
+ const = [
+ c
+ for c in m2.tables["t"].constraints
+ if isinstance(c, UniqueConstraint)
+ ][0]
+
+ is_true(const.columns[0].compare(t.c.q))
+
+ def test_pickle_idx_expr_annotated_col(self, no_pickle_annotated):
+ t, q_col = no_pickle_annotated
+
+ expr = q_col > 5
+ t.append_constraint(Index("conditional_index", expr))
+
+ m2 = pickle.loads(pickle.dumps(t.metadata))
+
+ const = list(m2.tables["t"].indexes)[0]
+
+ is_true(const.expressions[0].compare(expr))
+
+ def test_pickle_ck_binary_annotated_col(self, no_pickle_annotated):
+ t, q_col = no_pickle_annotated
+
+ ck = CheckConstraint(q_col > 5)
+ t.append_constraint(ck)
+
+ m2 = pickle.loads(pickle.dumps(t.metadata))
+ const = [
+ c
+ for c in m2.tables["t"].constraints
+ if isinstance(c, CheckConstraint)
+ ][0]
+ is_true(const.sqltext.compare(ck.sqltext))
+
class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):