--- /dev/null
+.. change::
+ :tags: schema, postgresql
+ :tickets: 5677
+
+ Added support for comments on :class:`.Constraint` objects, including
+ DDL and reflection; the field is added to the base :class:`.Constraint`
+ class and corresponding constructors, however PostgreSQL is the only
+ included backend to support the feature right now.
+ See parameters such as :paramref:`.ForeignKeyConstraint.comment`,
+ :paramref:`.UniqueConstraint.comment` or
+ :paramref:`.CheckConstraint.comment`.
{"name": spec["name"], "sqltext": spec["sqltext"]}
for spec in parsed_state.ck_constraints
]
+ cks.sort(key=lambda d: d["name"] or "~") # sort None as last
return cks if cks else ReflectionDefaults.check_constraints()
@reflection.cache
create, prefix=prefix, **kw
)
+ def _can_comment_on_constraint(self, ddl_instance):
+ constraint = ddl_instance.element
+ if constraint.name is None:
+ raise exc.CompileError(
+ f"Can't emit COMMENT ON for constraint {constraint!r}: "
+ "it has no name"
+ )
+ if constraint.table is None:
+ raise exc.CompileError(
+ f"Can't emit COMMENT ON for constraint {constraint!r}: "
+ "it has no associated table"
+ )
+
+ def visit_set_constraint_comment(self, create, **kw):
+ self._can_comment_on_constraint(create)
+ return "COMMENT ON CONSTRAINT %s ON %s IS %s" % (
+ self.preparer.format_constraint(create.element),
+ self.preparer.format_table(create.element.table),
+ self.sql_compiler.render_literal_value(
+ create.element.comment, sqltypes.String()
+ ),
+ )
+
+ def visit_drop_constraint_comment(self, drop, **kw):
+ self._can_comment_on_constraint(drop)
+ return "COMMENT ON CONSTRAINT %s ON %s IS NULL" % (
+ self.preparer.format_constraint(drop.element),
+ self.preparer.format_table(drop.element.table),
+ )
+
class PGTypeCompiler(compiler.GenericTypeCompiler):
def visit_TSVECTOR(self, type_, **kw):
postfetch_lastrowid = False
supports_comments = True
+ supports_constraint_comments = True
supports_default_values = True
supports_default_metavalue = True
sql.func.generate_subscripts(
pg_catalog.pg_constraint.c.conkey, 1
).label("ord"),
+ pg_catalog.pg_description.c.description,
+ )
+ .outerjoin(
+ pg_catalog.pg_description,
+ pg_catalog.pg_description.c.objoid
+ == pg_catalog.pg_constraint.c.oid,
)
.where(
pg_catalog.pg_constraint.c.contype == bindparam("contype"),
select(
con_sq.c.conrelid,
con_sq.c.conname,
+ con_sq.c.description,
pg_catalog.pg_attribute.c.attname,
)
.select_from(pg_catalog.pg_attribute)
attr_sq.c.conrelid,
sql.func.array_agg(attr_sq.c.attname).label("cols"),
attr_sq.c.conname,
+ sql.func.min(attr_sq.c.description).label("description"),
)
.group_by(attr_sq.c.conrelid, attr_sq.c.conname)
.order_by(attr_sq.c.conrelid, attr_sq.c.conname)
)
result_by_oid = defaultdict(list)
- for oid, cols, constraint_name in result:
- result_by_oid[oid].append((cols, constraint_name))
+ for oid, cols, constraint_name, comment in result:
+ result_by_oid[oid].append((cols, constraint_name, comment))
for oid, tablename in batch:
for_oid = result_by_oid.get(oid, ())
if for_oid:
- for cols, constraint in for_oid:
- yield tablename, cols, constraint
+ for cols, constraint, comment in for_oid:
+ yield tablename, cols, constraint, comment
else:
- yield tablename, None, None
+ yield tablename, None, None, None
@reflection.cache
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
{
"constrained_columns": [] if cols is None else cols,
"name": pk_name,
+ "comment": comment,
}
if pk_name is not None
else default(),
)
- for (table_name, cols, pk_name) in result
+ for table_name, cols, pk_name, comment in result
)
@reflection.cache
else_=None,
),
pg_namespace_ref.c.nspname,
+ pg_catalog.pg_description.c.description,
)
.select_from(pg_catalog.pg_class)
.outerjoin(
pg_namespace_ref,
pg_class_ref.c.relnamespace == pg_namespace_ref.c.oid,
)
+ .outerjoin(
+ pg_catalog.pg_description,
+ pg_catalog.pg_description.c.objoid
+ == pg_catalog.pg_constraint.c.oid,
+ )
.order_by(
pg_catalog.pg_class.c.relname,
pg_catalog.pg_constraint.c.conname,
fkeys = defaultdict(list)
default = ReflectionDefaults.foreign_keys
- for table_name, conname, condef, conschema in result:
+ for table_name, conname, condef, conschema, comment in result:
# ensure that each table has an entry, even if it has
# no foreign keys
if conname is None:
"referred_table": referred_table,
"referred_columns": referred_columns,
"options": options,
+ "comment": comment,
}
table_fks.append(fkey_d)
return fkeys.items()
# each table can have multiple unique constraints
uniques = defaultdict(list)
default = ReflectionDefaults.unique_constraints
- for (table_name, cols, con_name) in result:
+ for table_name, cols, con_name, comment in result:
# ensure a list is created for each table. leave it empty if
# the table has no unique cosntraint
if con_name is None:
{
"column_names": cols,
"name": con_name,
+ "comment": comment,
}
)
return uniques.items()
),
else_=None,
),
+ pg_catalog.pg_description.c.description,
)
.select_from(pg_catalog.pg_class)
.outerjoin(
pg_catalog.pg_constraint.c.contype == "c",
),
)
+ .outerjoin(
+ pg_catalog.pg_description,
+ pg_catalog.pg_description.c.objoid
+ == pg_catalog.pg_constraint.c.oid,
+ )
+ .order_by(
+ pg_catalog.pg_class.c.relname,
+ pg_catalog.pg_constraint.c.conname,
+ )
.where(self._pg_class_relkind_condition(relkinds))
)
query = self._pg_class_filter_scope_schema(query, schema, scope)
check_constraints = defaultdict(list)
default = ReflectionDefaults.check_constraints
- for table_name, check_name, src in result:
+ for table_name, check_name, src, comment in result:
# only two cases for check_name and src: both null or both defined
if check_name is None and src is None:
check_constraints[(schema, table_name)] = default()
sqltext = re.compile(
r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL
).sub(r"\1", m.group(1))
- entry = {"name": check_name, "sqltext": sqltext}
+ entry = {
+ "name": check_name,
+ "sqltext": sqltext,
+ "comment": comment,
+ }
if m and m.group(2):
entry["dialect_options"] = {"not_valid": True}
)
CHECK_PATTERN = r"(?:CONSTRAINT (.+) +)?" r"CHECK *\( *(.+) *\),? *"
- check_constraints = []
+ cks = []
# NOTE: we aren't using re.S here because we actually are
# taking advantage of each CHECK constraint being all on one
# line in the table definition in order to delineate. This
if name:
name = re.sub(r'^"|"$', "", name)
- check_constraints.append({"sqltext": match.group(2), "name": name})
-
- if check_constraints:
- return check_constraints
+ cks.append({"sqltext": match.group(2), "name": name})
+ cks.sort(key=lambda d: d["name"] or "~") # sort None as last
+ if cks:
+ return cks
else:
return ReflectionDefaults.check_constraints()
preparer = compiler.IdentifierPreparer
supports_alter = True
supports_comments = False
+ supports_constraint_comments = False
inline_comments = False
supports_statement_cache = True
object"""
-class ReflectedCheckConstraint(TypedDict):
+class ReflectedConstraint(TypedDict):
+ """Dictionary representing the reflected elements corresponding to
+ :class:`.Constraint`
+
+ A base class for all constraints
+ """
+
+ name: Optional[str]
+ """constraint name"""
+
+ comment: NotRequired[Optional[str]]
+ """comment for the constraint, if present"""
+
+
+class ReflectedCheckConstraint(ReflectedConstraint):
"""Dictionary representing the reflected elements corresponding to
:class:`.CheckConstraint`.
"""
- name: Optional[str]
- """constraint name"""
-
sqltext: str
"""the check constraint's SQL expression"""
object"""
-class ReflectedUniqueConstraint(TypedDict):
+class ReflectedUniqueConstraint(ReflectedConstraint):
"""Dictionary representing the reflected elements corresponding to
:class:`.UniqueConstraint`.
"""
- name: Optional[str]
- """constraint name"""
-
column_names: List[str]
"""column names which comprise the constraint"""
object"""
-class ReflectedPrimaryKeyConstraint(TypedDict):
+class ReflectedPrimaryKeyConstraint(ReflectedConstraint):
"""Dictionary representing the reflected elements corresponding to
:class:`.PrimaryKeyConstraint`.
"""
- name: Optional[str]
- """constraint name"""
-
constrained_columns: List[str]
"""column names which comprise the constraint"""
object"""
-class ReflectedForeignKeyConstraint(TypedDict):
+class ReflectedForeignKeyConstraint(ReflectedConstraint):
"""Dictionary representing the reflected elements corresponding to
:class:`.ForeignKeyConstraint`.
"""
- name: Optional[str]
- """constraint name"""
-
constrained_columns: List[str]
"""local column names which comprise the constraint"""
definition of a Table or Column. If False, this implies that ALTER must
be used to set table and column comments."""
+ supports_constraint_comments: bool
+ """Indicates if the dialect supports comment DDL on constraints.
+
+ .. versionadded: 2.0
+ """
+
_has_events = False
supports_statement_cache: bool = True
* ``name`` -
optional name of the primary key constraint.
+ * ``comment`` -
+ optional comment on the primary key constraint.
+
:param table_name: string name of the table. For special quoting,
use :class:`.quoted_name`.
* ``name`` -
optional name of the foreign key constraint.
+ * ``comment`` -
+ optional comment on the foreign key constraint
+
:param table_name: string name of the table. For special quoting,
use :class:`.quoted_name`.
* ``column_names`` -
list of column names in order
+ * ``comment`` -
+ optional comment on the constraint
+
:param table_name: string name of the table. For special quoting,
use :class:`.quoted_name`.
may or may not be present; a dictionary with additional
dialect-specific options for this CHECK constraint
+ * ``comment`` -
+ optional comment on the constraint
+
.. versionadded:: 1.3.8
:param table_name: string name of the table. For special quoting,
if pk in cols_by_orig_name and pk not in exclude_columns
]
- # update pk constraint name
+ # update pk constraint name and comment
table.primary_key.name = pk_cons.get("name")
+ table.primary_key.comment = pk_cons.get("comment", None)
# tell the PKConstraint to re-initialize
# its column collection
refspec,
conname,
link_to_name=True,
+ comment=fkey_d.get("comment"),
**options,
)
)
for const_d in constraints:
conname = const_d["name"]
columns = const_d["column_names"]
+ comment = const_d.get("comment")
duplicates = const_d.get("duplicates_index")
if include_columns and not set(columns).issubset(include_columns):
continue
else:
constrained_cols.append(constrained_col)
table.append_constraint(
- sa_schema.UniqueConstraint(*constrained_cols, name=conname)
+ sa_schema.UniqueConstraint(
+ *constrained_cols, name=conname, comment=comment
+ )
)
def _reflect_check_constraints(
from .sql.ddl import DDLElement as DDLElement
from .sql.ddl import DropColumnComment as DropColumnComment
from .sql.ddl import DropConstraint as DropConstraint
+from .sql.ddl import DropConstraintComment as DropConstraintComment
from .sql.ddl import DropIndex as DropIndex
from .sql.ddl import DropSchema as DropSchema
from .sql.ddl import DropSequence as DropSequence
from .sql.ddl import ExecutableDDLElement as ExecutableDDLElement
from .sql.ddl import InvokeDDLBase as InvokeDDLBase
from .sql.ddl import SetColumnComment as SetColumnComment
+from .sql.ddl import SetConstraintComment as SetConstraintComment
from .sql.ddl import SetTableComment as SetTableComment
from .sql.ddl import sort_tables as sort_tables
from .sql.ddl import (
drop.element, use_table=True
)
+ def visit_set_constraint_comment(self, create, **kw):
+ raise exc.UnsupportedCompilationError(self, type(create))
+
+ def visit_drop_constraint_comment(self, drop, **kw):
+ raise exc.UnsupportedCompilationError(self, type(drop))
+
def get_identity_options(self, identity_options):
text = []
if identity_options.increment is not None:
__visit_name__ = "drop_column_comment"
+class SetConstraintComment(_CreateDropBase):
+ """Represent a COMMENT ON CONSTRAINT IS statement."""
+
+ __visit_name__ = "set_constraint_comment"
+
+
+class DropConstraintComment(_CreateDropBase):
+ """Represent a COMMENT ON CONSTRAINT IS NULL statement."""
+
+ __visit_name__ = "drop_constraint_comment"
+
+
class InvokeDDLBase(SchemaVisitor):
def __init__(self, connection):
self.connection = connection
if column.comment is not None:
SetColumnComment(column)._invoke_with(self.connection)
+ if self.dialect.supports_constraint_comments:
+ for constraint in table.constraints:
+ if constraint.comment is not None:
+ self.connection.execute(
+ SetConstraintComment(constraint)
+ )
+
table.dispatch.after_create(
table,
self.connection,
link_to_name: bool = False,
match: Optional[str] = None,
info: Optional[_InfoType] = None,
+ comment: Optional[str] = None,
_unresolvable: bool = False,
**dialect_kw: Any,
):
.. versionadded:: 1.0.0
+ :param comment: Optional string that will render an SQL comment on
+ foreign key constraint creation.
+
+ .. versionadded:: 2.0
+
:param \**dialect_kw: Additional keyword arguments are dialect
specific, and passed in the form ``<dialectname>_<argname>``. The
arguments are ultimately handled by a corresponding
self.initially = initially
self.link_to_name = link_to_name
self.match = match
+ self.comment = comment
if info:
self.info = info
self._unvalidated_dialect_kw = dialect_kw
initially=self.initially,
link_to_name=self.link_to_name,
match=self.match,
+ comment=self.comment,
**self._unvalidated_dialect_kw,
)
return self._schema_item_copy(fk)
deferrable=self.deferrable,
initially=self.initially,
match=self.match,
+ comment=self.comment,
**self._unvalidated_dialect_kw,
)
self.constraint._append_element(column, self)
deferrable: Optional[bool] = None,
initially: Optional[str] = None,
info: Optional[_InfoType] = None,
+ comment: Optional[str] = None,
_create_rule: Optional[Any] = None,
_type_bound: bool = False,
**dialect_kw: Any,
.. versionadded:: 1.0.0
+ :param comment: Optional string that will render an SQL comment on
+ foreign key constraint creation.
+
+ .. versionadded:: 2.0
+
:param \**dialect_kw: Additional keyword arguments are dialect
specific, and passed in the form ``<dialectname>_<argname>``. See
the documentation regarding an individual dialect at
self._type_bound = _type_bound
util.set_creation_order(self)
self._validate_dialect_kwargs(dialect_kw)
+ self.comment = comment
def _should_create_for_compiler(
self, compiler: DDLCompiler, **kw: Any
_copy_expression(expr, self.parent, target_table)
for expr in self._columns
],
+ comment=self.comment,
**constraint_kwargs,
)
return self._schema_item_copy(c)
deferrable=self.deferrable,
_create_rule=self._create_rule,
table=target_table,
+ comment=self.comment,
_autoattach=False,
_type_bound=self._type_bound,
)
match: Optional[str] = None,
table: Optional[Table] = None,
info: Optional[_InfoType] = None,
+ comment: Optional[str] = None,
**dialect_kw: Any,
) -> None:
r"""Construct a composite-capable FOREIGN KEY.
.. versionadded:: 1.0.0
+ :param comment: Optional string that will render an SQL comment on
+ foreign key constraint creation.
+
+ .. versionadded:: 2.0
+
:param \**dialect_kw: Additional keyword arguments are dialect
specific, and passed in the form ``<dialectname>_<argname>``. See
the documentation regarding an individual dialect at
deferrable=deferrable,
initially=initially,
info=info,
+ comment=comment,
**dialect_kw,
)
self.onupdate = onupdate
initially=self.initially,
link_to_name=self.link_to_name,
match=self.match,
+ comment=self.comment,
)
for self_fk, other_fk in zip(self.elements, fkc.elements):
self_fk._schema_item_copy(other_fk)
@property
def comment_reflection(self):
+ """Indicates if the database support table comment reflection"""
+ return exclusions.closed()
+
+ @property
+ def constraint_comment_reflection(self):
+ """indicates if the database support constraint on constraints
+ and their reflection"""
return exclusions.closed()
@property
schema_prefix = ""
if testing.requires.self_referential_foreign_keys.enabled:
- users = Table(
- "users",
- metadata,
- Column("user_id", sa.INT, primary_key=True),
- Column("test1", sa.CHAR(5), nullable=False),
- Column("test2", sa.Float(), nullable=False),
- Column(
- "parent_user_id",
- sa.Integer,
- sa.ForeignKey(
- "%susers.user_id" % schema_prefix, name="user_id_fk"
- ),
+ parent_id_args = (
+ ForeignKey(
+ "%susers.user_id" % schema_prefix, name="user_id_fk"
),
- sa.CheckConstraint("test2 > 0", name="test2_gt_zero"),
- schema=schema,
- test_needs_fk=True,
)
else:
- users = Table(
- "users",
- metadata,
- Column("user_id", sa.INT, primary_key=True),
- Column("test1", sa.CHAR(5), nullable=False),
- Column("test2", sa.Float(), nullable=False),
- Column("parent_user_id", sa.Integer),
- sa.CheckConstraint("test2 > 0", name="test2_gt_zero"),
- schema=schema,
- test_needs_fk=True,
- )
+ parent_id_args = ()
+ users = Table(
+ "users",
+ metadata,
+ Column("user_id", sa.INT, primary_key=True),
+ Column("test1", sa.CHAR(5), nullable=False),
+ Column("test2", sa.Float(), nullable=False),
+ Column("parent_user_id", sa.Integer, *parent_id_args),
+ sa.CheckConstraint(
+ "test2 > 0",
+ name="zz_test2_gt_zero",
+ comment="users check constraint",
+ ),
+ sa.CheckConstraint("test2 <= 1000"),
+ schema=schema,
+ test_needs_fk=True,
+ )
Table(
"dingalings",
Column(
"address_id",
sa.Integer,
- sa.ForeignKey(
+ ForeignKey(
"%semail_addresses.address_id" % schema_prefix,
- name="email_add_id_fg",
+ name="zz_email_add_id_fg",
+ comment="di fk comment",
),
),
+ Column(
+ "id_user",
+ sa.Integer,
+ ForeignKey("%susers.user_id" % schema_prefix),
+ ),
Column("data", sa.String(30), unique=True),
sa.CheckConstraint(
"address_id > 0 AND address_id < 1000",
name="address_id_gt_zero",
),
sa.UniqueConstraint(
- "address_id", "dingaling_id", name="zz_dingalings_multiple"
+ "address_id",
+ "dingaling_id",
+ name="zz_dingalings_multiple",
+ comment="di unique comment",
),
schema=schema,
test_needs_fk=True,
"email_addresses",
metadata,
Column("address_id", sa.Integer),
- Column(
- "remote_user_id", sa.Integer, sa.ForeignKey(users.c.user_id)
- ),
+ Column("remote_user_id", sa.Integer, ForeignKey(users.c.user_id)),
Column("email_address", sa.String(20), index=True),
- sa.PrimaryKeyConstraint("address_id", name="email_ad_pk"),
+ sa.PrimaryKeyConstraint(
+ "address_id", name="email_ad_pk", comment="ea pk comment"
+ ),
schema=schema,
test_needs_fk=True,
)
(schema, "dingalings_v"): [
col("dingaling_id", auto="omit", nullable=mock.ANY),
col("address_id"),
+ col("id_user"),
col("data"),
]
}
(schema, "dingalings"): [
pk("dingaling_id"),
col("address_id"),
+ col("id_user"),
col("data"),
],
(schema, "email_addresses"): [
kind=ObjectKind.ANY,
filter_names=None,
):
- def pk(*cols, name=mock.ANY):
- return {"constrained_columns": list(cols), "name": name}
+ def pk(*cols, name=mock.ANY, comment=None):
+ return {
+ "constrained_columns": list(cols),
+ "name": name,
+ "comment": comment,
+ }
empty = pk(name=None)
if testing.requires.materialized_views_reflect_pk.enabled:
tables = {
(schema, "users"): pk("user_id"),
(schema, "dingalings"): pk("dingaling_id"),
- (schema, "email_addresses"): pk("address_id", name="email_ad_pk"),
+ (schema, "email_addresses"): pk(
+ "address_id", name="email_ad_pk", comment="ea pk comment"
+ ),
(schema, "comment_test"): pk("id"),
(schema, "no_constraints"): empty,
(schema, "local_table"): pk("id"),
or config.db.dialect.default_schema_name == other
)
- def fk(cols, ref_col, ref_table, ref_schema=schema, name=mock.ANY):
+ def fk(
+ cols,
+ ref_col,
+ ref_table,
+ ref_schema=schema,
+ name=mock.ANY,
+ comment=None,
+ ):
return {
"constrained_columns": cols,
"referred_columns": ref_col,
if ref_schema is not None
else tt(),
"referred_table": ref_table,
+ "comment": comment,
}
materialized = {(schema, "dingalings_v"): []}
fk(["parent_user_id"], ["user_id"], "users", name="user_id_fk")
],
(schema, "dingalings"): [
+ fk(["id_user"], ["user_id"], "users"),
fk(
["address_id"],
["address_id"],
"email_addresses",
- name="email_add_id_fg",
- )
+ name="zz_email_add_id_fg",
+ comment="di fk comment",
+ ),
],
(schema, "email_addresses"): [
fk(["remote_user_id"], ["user_id"], "users")
],
(schema, "dingalings"): [
*idx("data", name=mock.ANY, unique=True, duplicates=True),
+ *idx("id_user", name=mock.ANY, fk=True),
*idx(
"address_id",
"dingaling_id",
filter_names=None,
all_=False,
):
- def uc(*cols, name, duplicates_index=None, is_index=False):
+ def uc(
+ *cols, name, duplicates_index=None, is_index=False, comment=None
+ ):
req = testing.requires.unique_index_reflect_as_unique_constraints
if is_index and not req.enabled:
return ()
res = {
"column_names": list(cols),
"name": name,
+ "comment": comment,
}
if duplicates_index:
res["duplicates_index"] = duplicates_index
"dingaling_id",
name="zz_dingalings_multiple",
duplicates_index="zz_dingalings_multiple",
+ comment="di unique comment",
),
],
(schema, "email_addresses"): [],
)
return self in res
- def cc(text, name):
- return {"sqltext": tt(text), "name": name}
+ def cc(text, name, comment=None):
+ return {"sqltext": tt(text), "name": name, "comment": comment}
# print({1: "test2 > (0)::double precision"} == {1: tt("test2 > 0")})
# assert 0
}
self._resolve_views(views, materialized)
tables = {
- (schema, "users"): [cc("test2 > 0", "test2_gt_zero")],
+ (schema, "users"): [
+ cc("test2 <= 1000", mock.ANY),
+ cc(
+ "test2 > 0",
+ "zz_test2_gt_zero",
+ comment="users check constraint",
+ ),
+ ],
(schema, "dingalings"): [
cc(
"address_id > 0 and address_id < 1000",
dupe = refl.pop("duplicates_index", None)
if dupe:
names_that_duplicate_index.add(dupe)
+ eq_(refl.pop("comment", None), None)
eq_(orig, refl)
reflected_metadata = MetaData()
"table",
metadata,
Column("id", Integer, primary_key=True),
- Column("x_id", Integer, sa.ForeignKey("x.id", name="xid")),
+ Column("x_id", Integer, ForeignKey("x.id", name="xid")),
Column("test", String(10)),
test_needs_fk=True,
)
from sqlalchemy.engine import ObjectKind
from sqlalchemy.engine import ObjectScope
from sqlalchemy.schema import CreateIndex
+from sqlalchemy.sql import ddl as sa_ddl
from sqlalchemy.sql.schema import CheckConstraint
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import fixtures
"initially": "DEFERRED",
"match": "FULL",
},
+ "comment": None,
},
"company_industry_id_fkey": {
"name": "company_industry_id_fkey",
"referred_table": "industry",
"referred_schema": None,
"options": {"onupdate": "CASCADE", "ondelete": "CASCADE"},
+ "comment": None,
},
}
metadata.create_all(connection)
)
def test_reflect_check_warning(self):
- rows = [("foo", "some name", "NOTCHECK foobar")]
+ rows = [("foo", "some name", "NOTCHECK foobar", None)]
conn = mock.Mock(
execute=lambda *arg, **kw: mock.MagicMock(
fetchall=lambda: rows, __iter__=lambda self: iter(rows)
def test_reflect_extra_newlines(self):
rows = [
- ("foo", "some name", "CHECK (\n(a \nIS\n NOT\n\n NULL\n)\n)"),
- ("foo", "some other name", "CHECK ((b\nIS\nNOT\nNULL))"),
- ("foo", "some CRLF name", "CHECK ((c\r\n\r\nIS\r\nNOT\r\nNULL))"),
- ("foo", "some name", "CHECK (c != 'hi\nim a name\n')"),
+ (
+ "foo",
+ "some name",
+ "CHECK (\n(a \nIS\n NOT\n\n NULL\n)\n)",
+ None,
+ ),
+ ("foo", "some other name", "CHECK ((b\nIS\nNOT\nNULL))", None),
+ (
+ "foo",
+ "some CRLF name",
+ "CHECK ((c\r\n\r\nIS\r\nNOT\r\nNULL))",
+ None,
+ ),
+ ("foo", "some name", "CHECK (c != 'hi\nim a name\n')", None),
]
conn = mock.Mock(
execute=lambda *arg, **kw: mock.MagicMock(
{
"name": "some name",
"sqltext": "a \nIS\n NOT\n\n NULL\n",
+ "comment": None,
+ },
+ {
+ "name": "some other name",
+ "sqltext": "b\nIS\nNOT\nNULL",
+ "comment": None,
},
- {"name": "some other name", "sqltext": "b\nIS\nNOT\nNULL"},
{
"name": "some CRLF name",
"sqltext": "c\r\n\r\nIS\r\nNOT\r\nNULL",
+ "comment": None,
+ },
+ {
+ "name": "some name",
+ "sqltext": "c != 'hi\nim a name\n'",
+ "comment": None,
},
- {"name": "some name", "sqltext": "c != 'hi\nim a name\n'"},
],
)
def test_reflect_with_not_valid_check_constraint(self):
- rows = [("foo", "some name", "CHECK ((a IS NOT NULL)) NOT VALID")]
+ rows = [
+ ("foo", "some name", "CHECK ((a IS NOT NULL)) NOT VALID", None)
+ ]
conn = mock.Mock(
execute=lambda *arg, **kw: mock.MagicMock(
fetchall=lambda: rows, __iter__=lambda self: iter(rows)
"name": "some name",
"sqltext": "a IS NOT NULL",
"dialect_options": {"not_valid": True},
+ "comment": None,
}
],
)
[["b"]],
)
+ def test_reflection_constraint_comments(self, connection, metadata):
+ t = Table(
+ "foo",
+ metadata,
+ Column("id", Integer),
+ Column("foo_id", ForeignKey("foo.id", name="fk_1")),
+ Column("foo_other_id", ForeignKey("foo.id", name="fk_2")),
+ CheckConstraint("id>0", name="ch_1"),
+ CheckConstraint("id<1000", name="ch_2"),
+ PrimaryKeyConstraint("id", name="foo_pk"),
+ UniqueConstraint("id", "foo_id", name="un_1"),
+ UniqueConstraint("id", "foo_other_id", name="un_2"),
+ )
+ metadata.create_all(connection)
+
+ def check(elements, exp):
+ elements = {c["name"]: c["comment"] for c in elements}
+ eq_(elements, exp)
+
+ def all_none():
+ insp = inspect(connection)
+ is_(insp.get_pk_constraint("foo")["comment"], None)
+ check(
+ insp.get_check_constraints("foo"), {"ch_1": None, "ch_2": None}
+ )
+ check(
+ insp.get_unique_constraints("foo"),
+ {"un_1": None, "un_2": None},
+ )
+ check(insp.get_foreign_keys("foo"), {"fk_1": None, "fk_2": None})
+
+ all_none()
+
+ c = next(c for c in t.constraints if c.name == "ch_1")
+ u = next(c for c in t.constraints if c.name == "un_1")
+ f = next(c for c in t.foreign_key_constraints if c.name == "fk_1")
+ p = t.primary_key
+ c.comment = "cc comment"
+ u.comment = "uc comment"
+ f.comment = "fc comment"
+ p.comment = "pk comment"
+ for cst in [c, u, f, p]:
+ connection.execute(sa_ddl.SetConstraintComment(cst))
+
+ insp = inspect(connection)
+ eq_(insp.get_pk_constraint("foo")["comment"], "pk comment")
+ check(
+ insp.get_check_constraints("foo"),
+ {"ch_1": "cc comment", "ch_2": None},
+ )
+ check(
+ insp.get_unique_constraints("foo"),
+ {"un_1": "uc comment", "un_2": None},
+ )
+ check(
+ insp.get_foreign_keys("foo"), {"fk_1": "fc comment", "fk_2": None}
+ )
+
+ for cst in [c, u, f, p]:
+ connection.execute(sa_ddl.DropConstraintComment(cst))
+ all_none()
+
class CustomTypeReflectionTest(fixtures.TestBase):
class CustomType:
eq_(
inspector.get_check_constraints("cp"),
[
- {"sqltext": "q > 1 AND q < 6", "name": None},
{"sqltext": "q == 1 OR (q > 2 AND q < 5)", "name": "cq"},
+ {"sqltext": "q > 1 AND q < 6", "name": None},
],
)
def log(fn):
@wraps(fn)
def wrap(*a, **kw):
- print("Running ", fn.__name__, "...", flush=True, end="")
+ print("Running", fn.__name__, "...", flush=True, end="")
try:
r = fn(*a, **kw)
except NotImplementedError:
def comment_reflection(self):
return only_on(["postgresql", "mysql", "mariadb", "oracle"])
+ @property
+ def constraint_comment_reflection(self):
+ return only_on(["postgresql"])
+
@property
def unbounded_varchar(self):
"""Target database must support VARCHAR with no length"""
deferrable="Z",
initially="Q",
link_to_name=True,
+ comment="foo",
)
fk1 = ForeignKey(c1, **kw)
name="name",
initially=True,
deferrable=True,
+ comment="foo",
_create_rule=r,
)
c2 = c._copy()
eq_(str(c2.sqltext), "foo bar")
eq_(c2.initially, True)
eq_(c2.deferrable, True)
+ eq_(c2.comment, "foo")
assert c2._create_rule is r
def test_col_replace_w_constraint(self):
for c in t3.constraints:
assert c.table is t3
+ def test_ColumnCollectionConstraint_copy(self):
+ m = MetaData()
+
+ t = Table("tbl", m, Column("a", Integer), Column("b", Integer))
+ t2 = Table("t2", m, Column("a", Integer), Column("b", Integer))
+
+ kw = {
+ "comment": "baz",
+ "name": "ccc",
+ "initially": "foo",
+ "deferrable": "bar",
+ }
+
+ UniqueConstraint(t.c.a, **kw)
+ CheckConstraint(t.c.a > 5, **kw)
+ ForeignKeyConstraint([t.c.a], [t2.c.a], **kw)
+ PrimaryKeyConstraint(t.c.a, **kw)
+
+ m2 = MetaData()
+
+ t3 = t.to_metadata(m2)
+
+ eq_(len(t3.constraints), 4)
+
+ for c in t3.constraints:
+ assert c.table is t3
+ for k, v in kw.items():
+ eq_(getattr(c, k), v)
+
def test_check_constraint_copy(self):
m = MetaData()
t = Table("tbl", m, Column("a", Integer), Column("b", Integer))