]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Implemented CHECK constraint reflection for SQLite and PostgreSQL
authorAlex Grönholm <alex.gronholm@nextday.fi>
Mon, 11 Apr 2016 21:01:42 +0000 (17:01 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Wed, 1 Jun 2016 16:57:36 +0000 (12:57 -0400)
Co-Authored-By: Mike Bayer <mike_mp@zzzcomputing.com>
Change-Id: Ie6cf2d2958d1c567324db9e08fef2d3186e97350
Pull-request: https://bitbucket.org/zzzeek/sqlalchemy/pull-requests/80

doc/build/changelog/changelog_11.rst
doc/build/changelog/migration_11.rst
lib/sqlalchemy/dialects/postgresql/base.py
lib/sqlalchemy/dialects/sqlite/base.py
lib/sqlalchemy/engine/interfaces.py
lib/sqlalchemy/engine/reflection.py
test/dialect/postgresql/test_reflection.py
test/dialect/test_sqlite.py
test/engine/test_reflection.py
test/requirements.py

index 27513e5691d03341db13f433aa934af2a66a88b4..f86a0f1dc3b96fb71971cd1aec3ba2c834a83da4 100644 (file)
         symbol when appropriate in conjunction with the ``VARBINARY``
         data type.  Pull request courtesy Sheila Allen.
 
+    .. change::
+        :tags: feature, sql
+        :pullreq: bitbucket:80
+
+        Implemented reflection of CHECK constraints for SQLite and Postgresql.
+        This is available via the new inspector method
+        :meth:`.Inspector.get_check_constraints` as well as when reflecting
+        :class:`.Table` objects in the form of :class:`.CheckConstraint`
+        objects present in the constraints collection.  Pull request courtesy
+        Alex Grönholm.
+
     .. change::
         :tags: change, orm
         :tickets: 3394
index 09394613011f2a96d4282822ff86f94ad1c905e0..b32183a6e26f59b6a44de3451da4388849d2d0c1 100644 (file)
@@ -2033,6 +2033,14 @@ emits::
 
 :ticket:`2729`
 
+Check constraints now reflect
+-----------------------------
+
+The Postgresql dialect now supports reflection of CHECK constraints
+both within the method :meth:`.Inspector.get_check_constraints` as well
+as within :class:`.Table` reflection within the :attr:`.Table.constraints`
+collection.
+
 Support for PyGreSQL
 --------------------
 
@@ -2220,6 +2228,14 @@ foreign key constraints in recent SQLAlchemy versions.
 
 :ticket:`3629`
 
+Check constraints now reflect
+-----------------------------
+
+The SQLite dialect now supports reflection of CHECK constraints
+both within the method :meth:`.Inspector.get_check_constraints` as well
+as within :class:`.Table` reflection within the :attr:`.Table.constraints`
+collection.
+
 Dialect Improvements and Changes - SQL Server
 =============================================
 
index 9d019b56eefcea9c1c7c155ad431731c825bcd16..fe3d29450669983e10fd0862206380dc0e1657e4 100644 (file)
@@ -2497,6 +2497,31 @@ class PGDialect(default.DefaultDialect):
             for name, uc in uniques.items()
         ]
 
+    @reflection.cache
+    def get_check_constraints(
+            self, connection, table_name, schema=None, **kw):
+        table_oid = self.get_table_oid(connection, table_name, schema,
+                                       info_cache=kw.get('info_cache'))
+
+        CHECK_SQL = """
+            SELECT
+                cons.conname as name,
+                cons.consrc as src
+            FROM
+                pg_catalog.pg_constraint cons
+            WHERE
+                cons.conrelid = :table_oid AND
+                cons.contype = 'c'
+        """
+
+        c = connection.execute(sql.text(CHECK_SQL), table_oid=table_oid)
+
+        return [
+            {'name': name,
+             'sqltext': src[1:-1]}
+            for name, src in c.fetchall()
+            ]
+
     def _load_enums(self, connection, schema=None):
         schema = schema or self.default_schema_name
         if not self.supports_native_enum:
index ddd86944882ee0f21742450d645a9d078692f8e1..2fe10556beabfc996772e9ef4b4e9679f830162c 100644 (file)
@@ -1473,6 +1473,32 @@ class SQLiteDialect(default.DefaultDialect):
         # the PRIMARY KEY may have an entry.
         return unique_constraints
 
+    @reflection.cache
+    def get_check_constraints(self, connection, table_name,
+                              schema=None, **kw):
+        table_data = self._get_table_sql(
+            connection, table_name, schema=schema, **kw)
+        if not table_data:
+            return []
+
+        CHECK_PATTERN = (
+            '(?:CONSTRAINT (\w+) +)?'
+            'CHECK *\( *(.+) *\),? *'
+        )
+        check_constraints = []
+        # NOTE: we aren't using re.S here because we actually are
+        # taking advantage of each CHECK constraint being all on one
+        # line in the table definition in order to delineate.  This
+        # necessarily makes assumptions as to how the CREATE TABLE
+        # was emitted.
+        for match in re.finditer(CHECK_PATTERN, table_data, re.I):
+            check_constraints.append({
+                'sqltext': match.group(2),
+                'name': match.group(1)
+            })
+
+        return check_constraints
+
     @reflection.cache
     def get_indexes(self, connection, table_name, schema=None, **kw):
         pragma_indexes = self._get_table_pragma(
index 26731f9a52a656204bb509b272187267042dda7b..13e8bf1f48e3b11c254ad32c68c97037657e66bc 100644 (file)
@@ -399,6 +399,29 @@ class Dialect(object):
 
         raise NotImplementedError()
 
+    def get_check_constraints(
+            self, connection, table_name, schema=None, **kw):
+        """Return information about check constraints in `table_name`.
+
+        Given a string `table_name` and an optional string `schema`, return
+        check constraint information as a list of dicts with these keys:
+
+        name
+          the check constraint's name
+
+        sqltext
+          the check constraint's SQL expression
+
+        \**kw
+          other options passed to the dialect's get_check_constraints()
+          method.
+
+        .. versionadded:: 1.1.0
+
+        """
+
+        raise NotImplementedError()
+
     def normalize_name(self, name):
         """convert the given name to lowercase if it is detected as
         case insensitive.
index 2d524978da09eb5c2511c175ddbf6dddd3a15466..14e98cecf15dca4bf045c14d47539e866df25924 100644 (file)
@@ -506,6 +506,32 @@ class Inspector(object):
         return self.dialect.get_unique_constraints(
             self.bind, table_name, schema, info_cache=self.info_cache, **kw)
 
+    def get_check_constraints(self, table_name, schema=None, **kw):
+        """Return information about check constraints in `table_name`.
+
+        Given a string `table_name` and an optional string `schema`, return
+        check constraint information as a list of dicts with these keys:
+
+        name
+          the check constraint's name
+
+        sqltext
+          the check constraint's SQL expression
+
+        :param table_name: string name of the table.  For special quoting,
+         use :class:`.quoted_name`.
+
+        :param schema: string schema name; if omitted, uses the default schema
+         of the database connection.  For special quoting,
+         use :class:`.quoted_name`.
+
+        .. versionadded:: 1.1.0
+
+        """
+
+        return self.dialect.get_check_constraints(
+            self.bind, table_name, schema, info_cache=self.info_cache, **kw)
+
     def reflecttable(self, table, include_columns, exclude_columns=()):
         """Given a Table object, load its internal constructs based on
         introspection.
@@ -586,6 +612,10 @@ class Inspector(object):
             table_name, schema, table, cols_by_orig_name,
             include_columns, exclude_columns, reflection_options)
 
+        self._reflect_check_constraints(
+            table_name, schema, table, cols_by_orig_name,
+            include_columns, exclude_columns, reflection_options)
+
     def _reflect_column(
         self, table, col_d, include_columns,
             exclude_columns, cols_by_orig_name):
@@ -788,3 +818,16 @@ class Inspector(object):
                     constrained_cols.append(constrained_col)
             table.append_constraint(
                 sa_schema.UniqueConstraint(*constrained_cols, name=conname))
+
+    def _reflect_check_constraints(
+            self, table_name, schema, table, cols_by_orig_name,
+            include_columns, exclude_columns, reflection_options):
+        try:
+            constraints = self.get_check_constraints(table_name, schema)
+        except NotImplementedError:
+            # optional dialect feature
+            return
+
+        for const_d in constraints:
+            table.append_constraint(
+                sa_schema.CheckConstraint(**const_d))
index 4897c4a7e2803583bc5edfba529a38030268a22b..a0f9dcd49ac0f2072bc8173d3fde4eb622259e09 100644 (file)
@@ -1,6 +1,7 @@
 # coding: utf-8
 
 from sqlalchemy.engine import reflection
+from sqlalchemy.sql.schema import CheckConstraint
 from sqlalchemy.testing.assertions import eq_, assert_raises, \
     AssertsExecutionResults
 from sqlalchemy.testing import fixtures
@@ -965,6 +966,29 @@ class ReflectionTest(fixtures.TestBase):
         assert indexes['ix_a'].unique
         self.assert_('ix_a' not in constraints)
 
+    @testing.provide_metadata
+    def test_reflect_check_constraint(self):
+        meta = self.metadata
+
+        cc_table = Table(
+            'pgsql_cc', meta,
+            Column('a', Integer()),
+            CheckConstraint('a > 1 AND a < 5', name='cc1'),
+            CheckConstraint('a = 1 OR (a > 2 AND a < 5)', name='cc2'))
+
+        cc_table.create()
+
+        reflected = Table('pgsql_cc', MetaData(testing.db), autoload=True)
+
+        check_constraints = dict((uc.name, uc.sqltext.text)
+                                 for uc in reflected.constraints
+                                 if isinstance(uc, CheckConstraint))
+
+        eq_(check_constraints, {
+            u'cc1': u'(a > 1) AND (a < 5)',
+            u'cc2': u'(a = 1) OR ((a > 2) AND (a < 5))'
+        })
+
 
 class CustomTypeReflectionTest(fixtures.TestBase):
 
index 580950b12f2aa0f68b0d99f6f3d949be575a155d..dde9da0864e3f28460079f46c4e9a9bc7453e3e4 100644 (file)
@@ -1147,6 +1147,13 @@ class ConstraintReflectionTest(fixtures.TestBase):
             # will contain an "autoindex"
             conn.execute("create table o (foo varchar(20) primary key)")
 
+            conn.execute(
+                "CREATE TABLE cp ("
+                "q INTEGER check (q > 1 AND q < 6),\n"
+                "CONSTRAINT cq CHECK (q == 1 OR (q > 2 AND q < 5))\n"
+                ")"
+            )
+
     @classmethod
     def teardown_class(cls):
         with testing.db.begin() as conn:
@@ -1373,6 +1380,14 @@ class ConstraintReflectionTest(fixtures.TestBase):
             {'constrained_columns': [], 'name': None}
         )
 
+    def test_check_constraint(self):
+        inspector = Inspector(testing.db)
+        eq_(
+            inspector.get_check_constraints("cp"),
+            [{'sqltext': 'q > 1 AND q < 6', 'name': None},
+             {'sqltext': 'q == 1 OR (q > 2 AND q < 5)', 'name': 'cq'}]
+        )
+
 
 class SavepointTest(fixtures.TablesTest):
 
index 1dc65d7d04e19df9fe35e5e753c7aef2fac1e9a3..62568eb4a4fd6e8170b0793768d2ec313786e54c 100644 (file)
@@ -981,6 +981,26 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
         assert set([t2.c.name, t2.c.id]) == set(r2.columns)
         assert set([t2.c.name]) == set(r3.columns)
 
+    @testing.requires.check_constraint_reflection
+    @testing.provide_metadata
+    def test_check_constraint_reflection(self):
+        m1 = self.metadata
+        Table(
+            'x', m1,
+            Column('q', Integer),
+            sa.CheckConstraint('q > 10', name="ck1")
+        )
+        m1.create_all()
+        m2 = MetaData(testing.db)
+        t2 = Table('x', m2, autoload=True)
+
+        ck = [
+            const for const in
+            t2.constraints if isinstance(const, sa.CheckConstraint)][0]
+
+        eq_(ck.sqltext.text, "q > 10")
+        eq_(ck.name, "ck1")
+
     @testing.provide_metadata
     def test_index_reflection_cols_busted(self):
         t = Table('x', self.metadata,
index 9e041709cfc7f84abefee1aabde32fe22c279496..0609b3cbf1867f08bfef022f8a40c27438c44710 100644 (file)
@@ -316,6 +316,13 @@ class DefaultRequirements(SuiteRequirements):
                     "sqlite"
                 )
 
+    @property
+    def check_constraint_reflection(self):
+        return fails_on_everything_except(
+                    "postgresql",
+                    "sqlite"
+                )
+
     @property
     def temp_table_names(self):
         """target dialect supports listing of temporary table names"""