.. changelog::
:version: 0.9.0
+ .. change::
+ :tags: feature, sql
+
+ Added support for "unique constraint" reflection, via the
+ :meth:`.Inspector.get_unique_constraints` method.
+ Thanks for Roman Podolyaka for the patch.
+
.. change::
:tags: feature, pool
:tickets: 2752
\**kw
other options passed to the dialect's get_unique_constraints() method.
+ .. versionadded:: 0.9.0
+
"""
raise NotImplementedError()
\**kw
other options passed to the dialect's get_unique_constraints() method.
+ .. versionadded:: 0.9.0
+
"""
return self.dialect.get_unique_constraints(
def index_reflection(self):
return exclusions.open()
+ @property
+ def unique_constraint_reflection(self):
+ """target dialect supports reflection of unique constraints"""
+ return exclusions.open()
+
@property
def unbounded_varchar(self):
"""Target database must support VARCHAR with no length"""
from sqlalchemy.testing import eq_, assert_raises_message
from sqlalchemy import testing
from .. import config
-
+import operator
from sqlalchemy.schema import DDL, Index
from sqlalchemy import event
def test_get_indexes_with_schema(self):
self._test_get_indexes(schema='test_schema')
+
+ @testing.requires.unique_constraint_reflection
+ def test_get_unique_constraints(self):
+ self._test_get_unique_constraints()
+
+ @testing.requires.unique_constraint_reflection
+ @testing.requires.schemas
+ def test_get_unique_constraints_with_schema(self):
+ self._test_get_unique_constraints(schema='test_schema')
+
+ @testing.provide_metadata
+ def _test_get_unique_constraints(self, schema=None):
+ uniques = sorted(
+ [
+ {'name': 'unique_a_b_c', 'column_names': ['a', 'b', 'c']},
+ {'name': 'unique_a_c', 'column_names': ['a', 'c']},
+ {'name': 'unique_b_c', 'column_names': ['b', 'c']},
+ ],
+ key=operator.itemgetter('name')
+ )
+ orig_meta = self.metadata
+ table = Table(
+ 'testtbl', orig_meta,
+ Column('a', sa.String(20)),
+ Column('b', sa.String(30)),
+ Column('c', sa.Integer),
+ schema=schema
+ )
+ for uc in uniques:
+ table.append_constraint(
+ sa.UniqueConstraint(*uc['column_names'], name=uc['name'])
+ )
+ orig_meta.create_all()
+
+ inspector = inspect(orig_meta.bind)
+ reflected = sorted(
+ inspector.get_unique_constraints('testtbl', schema=schema),
+ key=operator.itemgetter('name')
+ )
+
+ eq_(uniques, reflected)
+
+
@testing.provide_metadata
def _test_get_view_definition(self, schema=None):
meta = self.metadata
assert set([t2.c.name, t2.c.id]) == set(r2.columns)
assert set([t2.c.name]) == set(r3.columns)
- @testing.provide_metadata
- def test_unique_constraints_reflection(self):
- uniques = sorted(
- [
- {'name': 'unique_a_b_c', 'column_names': ['a', 'b', 'c']},
- {'name': 'unique_a_c', 'column_names': ['a', 'c']},
- {'name': 'unique_b_c', 'column_names': ['b', 'c']},
- ],
- key=operator.itemgetter('name')
- )
-
- try:
- orig_meta = sa.MetaData(bind=testing.db)
- table = Table(
- 'testtbl', orig_meta,
- Column('a', sa.String(20)),
- Column('b', sa.String(30)),
- Column('c', sa.Integer),
- )
- for uc in uniques:
- table.append_constraint(
- sa.UniqueConstraint(*uc['column_names'], name=uc['name'])
- )
- orig_meta.create_all()
-
- inspector = inspect(testing.db)
- reflected = sorted(
- inspector.get_unique_constraints('testtbl'),
- key=operator.itemgetter('name')
- )
-
- assert uniques == reflected
- finally:
- testing.db.execute('drop table if exists testtbl;')
@testing.requires.views
@testing.provide_metadata
"postgresql"
])
+ @property
+ def unique_constraint_reflection(self):
+ return fails_on_everything_except(
+ "postgresql",
+ "mysql",
+ "sqlite"
+ )
+
@property
def update_nowait(self):
"""Target database must support SELECT...FOR UPDATE NOWAIT"""