connection, table_name, schema=schema, **kw
)
- # Notes:
- # * The pattern currently matches any character for the name of the
- # constraint, including newline characters (re.S flag) as long as
- # none of the SQLite's table constraints keywords are encountered
- # by a negative lookahead.
- # This prevents the pattern from matching subsequent constraints
- # as part of the name.
- # This is only done for those keywords if seperated by spaces, to
- # support constraint names that contains them e.g. "check_value".
- #
- # * Because check constraint definitions can also contain newline
- # or tab characters, the pattern matches any character untill either
- # the beginning of the next constraint statement using a
- # non-capturing and non-consuming group, allowing the next one
- # to match, or the end of the table definition
- # e.g. newline and closing ')'.
- CHECK_PATTERN = r"""
- # Non-capturing group for the name part of named check constraints.
- # This group is optional as unnamed check constraints can exist.
- (?:
- # Match beginning of constraint definition seperated by whitespace.
- CONSTRAINT\s
-
- # First capturing group that matches the actual name of the constraint.
- # Any characters is allowed, as long as none of the reserved table
- # constraint keywords are encountered using a negative lookahead.
- ((?:(?!\sPRIMARY\s|\sFOREIGN\sKEY|\sUNIQUE\s|\sCHECK\s).)+)
-
- # End of optional non-capturing name group seperated by whitespace.
- \s)?
-
- # Match beginning of the check expression with starting parenthesis
- # and optional whitespace.
- CHECK\s?\(
-
- # Match actual expression, which can be any character.
- (.+?)
-
- # End parenthesis of the check expression.
- \)
-
- # Non-capturing group that helps denote the end of the check
- # expression part.
- # This can either be (1) the beginning of the next constraint,
- # or (2) the end of the table definition.
- (?:
-
- # (1) Matches end of check constraint with trailing comma,
- # optional whitespace (including newline), and the beginning
- # of the next constraint (either named or unnamed).
- ,[\s\n]*(?=CONSTRAINT|CHECK|UNIQUE|FOREIGN|PRIMARY)
- # OR operator, seperating (1) & (2)
- |
- # (2) Matches end parenthesis of table definition, seperated by
- # newline.
- \n\)
- # End of non-capturing group.
- )
- """
+ # NOTE NOTE NOTE
+ # DO NOT CHANGE THIS REGULAR EXPRESSION. There is no known way
+ # to parse CHECK constraints that contain newlines themselves using
+ # regular expressions, and the approach here relies upon each
+ # individual
+ # CHECK constraint being on a single line by itself. This
+ # necessarily makes assumptions as to how the CREATE TABLE
+ # was emitted. A more comprehensive DDL parsing solution would be
+ # needed to improve upon the current situation. See #11840 for
+ # background
+ CHECK_PATTERN = r"(?:CONSTRAINT (.+) +)?CHECK *\( *(.+) *\),? *"
cks = []
- for match in re.finditer(
- CHECK_PATTERN, table_data or "", re.I | re.S | re.VERBOSE
- ):
+
+ for match in re.finditer(CHECK_PATTERN, table_data or "", re.I):
+
name = match.group(1)
if name:
# the MIT License: https://www.opensource.org/licenses/mit-license.php
# mypy: ignore-errors
+import contextlib
import operator
import re
class ComponentReflectionTestExtra(ComparesIndexes, fixtures.TestBase):
__backend__ = True
- @testing.combinations(
- (True, testing.requires.schemas), (False,), argnames="use_schema"
- )
- @testing.requires.check_constraint_reflection
- def test_get_check_constraints(self, metadata, connection, use_schema):
- if use_schema:
- schema = config.test_schema
+ @testing.fixture(params=[True, False])
+ def use_schema_fixture(self, request):
+ if request.param:
+ return config.test_schema
else:
- schema = None
+ return None
- Table(
- "sa_cc",
- metadata,
- Column("a", Integer()),
- sa.CheckConstraint("a > 1 AND a < 5", name="cc1"),
- sa.CheckConstraint(
- "a = 1 OR (a > 2 AND a < 5)", name="UsesCasing"
- ),
- schema=schema,
- )
- Table(
- "no_constraints",
- metadata,
- Column("data", sa.String(20)),
- schema=schema,
- )
+ @testing.fixture()
+ def inspect_for_table(self, metadata, connection, use_schema_fixture):
+ @contextlib.contextmanager
+ def go(tablename):
+ yield use_schema_fixture, inspect(connection)
- metadata.create_all(connection)
+ metadata.create_all(connection)
- insp = inspect(connection)
- reflected = sorted(
- insp.get_check_constraints("sa_cc", schema=schema),
- key=operator.itemgetter("name"),
- )
+ return go
+ def ck_eq(self, reflected, expected):
# trying to minimize effect of quoting, parenthesis, etc.
# may need to add more to this as new dialects get CHECK
# constraint reflection support
def normalize(sqltext):
return " ".join(
- re.findall(r"and|\d|=|a|or|<|>", sqltext.lower(), re.I)
+ re.findall(r"and|\d|=|a|b|c|or|<|>", sqltext.lower(), re.I)
)
- reflected = [
- {"name": item["name"], "sqltext": normalize(item["sqltext"])}
- for item in reflected
- ]
- eq_(
+ reflected = sorted(
+ [
+ {"name": item["name"], "sqltext": normalize(item["sqltext"])}
+ for item in reflected
+ ],
+ key=lambda item: (item["sqltext"]),
+ )
+
+ expected = sorted(
+ expected,
+ key=lambda item: (item["sqltext"]),
+ )
+ eq_(reflected, expected)
+
+ @testing.requires.check_constraint_reflection
+ def test_check_constraint_no_constraint(self, metadata, inspect_for_table):
+ with inspect_for_table("no_constraints") as (schema, inspector):
+ Table(
+ "no_constraints",
+ metadata,
+ Column("data", sa.String(20)),
+ schema=schema,
+ )
+
+ self.ck_eq(
+ inspector.get_check_constraints("no_constraints", schema=schema),
+ [],
+ )
+
+ @testing.requires.inline_check_constraint_reflection
+ @testing.combinations(
+ "my_inline", "MyInline", None, argnames="constraint_name"
+ )
+ def test_check_constraint_inline(
+ self, metadata, inspect_for_table, constraint_name
+ ):
+
+ with inspect_for_table("sa_cc") as (schema, inspector):
+ Table(
+ "sa_cc",
+ metadata,
+ Column("id", Integer(), primary_key=True),
+ Column(
+ "a",
+ Integer(),
+ sa.CheckConstraint(
+ "a > 1 AND a < 5", name=constraint_name
+ ),
+ ),
+ Column("data", String(50)),
+ schema=schema,
+ )
+
+ reflected = inspector.get_check_constraints("sa_cc", schema=schema)
+
+ self.ck_eq(
+ reflected,
+ [
+ {
+ "name": constraint_name or mock.ANY,
+ "sqltext": "a > 1 and a < 5",
+ },
+ ],
+ )
+
+ @testing.requires.check_constraint_reflection
+ @testing.combinations(
+ "my_ck_const", "MyCkConst", None, argnames="constraint_name"
+ )
+ def test_check_constraint_standalone(
+ self, metadata, inspect_for_table, constraint_name
+ ):
+ with inspect_for_table("sa_cc") as (schema, inspector):
+ Table(
+ "sa_cc",
+ metadata,
+ Column("a", Integer()),
+ sa.CheckConstraint(
+ "a = 1 OR (a > 2 AND a < 5)", name=constraint_name
+ ),
+ schema=schema,
+ )
+
+ reflected = inspector.get_check_constraints("sa_cc", schema=schema)
+
+ self.ck_eq(
+ reflected,
+ [
+ {
+ "name": constraint_name or mock.ANY,
+ "sqltext": "a = 1 or a > 2 and a < 5",
+ },
+ ],
+ )
+
+ @testing.requires.inline_check_constraint_reflection
+ def test_check_constraint_mixed(self, metadata, inspect_for_table):
+ with inspect_for_table("sa_cc") as (schema, inspector):
+ Table(
+ "sa_cc",
+ metadata,
+ Column("id", Integer(), primary_key=True),
+ Column("a", Integer(), sa.CheckConstraint("a > 1 AND a < 5")),
+ Column(
+ "b",
+ Integer(),
+ sa.CheckConstraint("b > 1 AND b < 5", name="my_inline"),
+ ),
+ Column("c", Integer()),
+ Column("data", String(50)),
+ sa.UniqueConstraint("data", name="some_uq"),
+ sa.CheckConstraint("c > 1 AND c < 5", name="cc1"),
+ sa.UniqueConstraint("c", name="some_c_uq"),
+ schema=schema,
+ )
+
+ reflected = inspector.get_check_constraints("sa_cc", schema=schema)
+
+ self.ck_eq(
reflected,
[
- {"name": "UsesCasing", "sqltext": "a = 1 or a > 2 and a < 5"},
- {"name": "cc1", "sqltext": "a > 1 and a < 5"},
+ {"name": "cc1", "sqltext": "c > 1 and c < 5"},
+ {"name": "my_inline", "sqltext": "b > 1 and b < 5"},
+ {"name": mock.ANY, "sqltext": "a > 1 and a < 5"},
],
)
- no_cst = "no_constraints"
- eq_(insp.get_check_constraints(no_cst, schema=schema), [])
@testing.requires.indexes_with_expressions
def test_reflect_expression_based_indexes(self, metadata, connection):
conn.exec_driver_sql(
"CREATE TABLE cp ("
- "q INTEGER check (q > 1 AND q < 6),\n"
- "CONSTRAINT cq CHECK (q == 1 OR (q > 2 AND q < 5))\n"
+ "id INTEGER NOT NULL,\n"
+ "q INTEGER, \n"
+ "p INTEGER, \n"
+ "CONSTRAINT cq CHECK (p = 1 OR (p > 2 AND p < 5)),\n"
+ "PRIMARY KEY (id)\n"
+ ")"
+ )
+
+ conn.exec_driver_sql(
+ "CREATE TABLE cp_inline (\n"
+ "id INTEGER NOT NULL,\n"
+ "q INTEGER CHECK (q > 1 AND q < 6), \n"
+ "p INTEGER CONSTRAINT cq CHECK (p = 1 OR (p > 2 AND p < 5)),\n"
+ "PRIMARY KEY (id)\n"
")"
)
{"constrained_columns": [], "name": None},
)
- def test_check_constraint(self):
+ def test_check_constraint_plain(self):
inspector = inspect(testing.db)
eq_(
inspector.get_check_constraints("cp"),
[
- {"sqltext": "q == 1 OR (q > 2 AND q < 5)", "name": "cq"},
+ {"sqltext": "p = 1 OR (p > 2 AND p < 5)", "name": "cq"},
+ ],
+ )
+
+ def test_check_constraint_inline_plain(self):
+ inspector = inspect(testing.db)
+ eq_(
+ inspector.get_check_constraints("cp_inline"),
+ [
+ {"sqltext": "p = 1 OR (p > 2 AND p < 5)", "name": "cq"},
{"sqltext": "q > 1 AND q < 6", "name": None},
],
)
+
+ @testing.fails("need to come up with new regex and/or DDL parsing")
+ def test_check_constraint_multiline(self):
+ """test for #11677"""
+
+ inspector = inspect(testing.db)
eq_(
inspector.get_check_constraints("r"),
[