.. changelog::
:version: 0.8.4
+ .. change::
+ :tags: feature, sql
+ :tickets: 1443
+ :versions: 0.9.0b1
+
+ Added support for "unique constraint" reflection, via the
+ :meth:`.Inspector.get_unique_constraints` method.
+ Thanks for Roman Podolyaka for the patch.
+
.. change::
:tags: bug, oracle
:tickets: 2864
indexes.append(index_d)
return indexes
+ @reflection.cache
+ def get_unique_constraints(self, connection, table_name,
+ schema=None, **kw):
+ parsed_state = self._parsed_state_or_create(
+ connection, table_name, schema, **kw)
+
+ return [
+ {
+ 'name': key['name'],
+ 'column_names': [col[0] for col in key['columns']]
+ }
+ for key in parsed_state.keys
+ if key['type'] == 'UNIQUE'
+ ]
+
@reflection.cache
def get_view_definition(self, connection, view_name, schema=None, **kw):
for name, idx in indexes.items()
]
+ @reflection.cache
+ def get_unique_constraints(self, connection, table_name,
+ schema=None, **kw):
+ table_oid = self.get_table_oid(connection, table_name, schema,
+ info_cache=kw.get('info_cache'))
+
+ UNIQUE_SQL = """
+ SELECT
+ cons.conname as name,
+ ARRAY_AGG(a.attname) as column_names
+ FROM
+ pg_catalog.pg_constraint cons
+ left outer join pg_attribute a
+ on cons.conrelid = a.attrelid and a.attnum = ANY(cons.conkey)
+ WHERE
+ cons.conrelid = :table_oid AND
+ cons.contype = 'u'
+ GROUP BY
+ cons.conname
+ """
+
+ t = sql.text(UNIQUE_SQL,
+ typemap={'column_names': ARRAY(sqltypes.Unicode)})
+ c = connection.execute(t, table_oid=table_oid)
+
+ return [
+ {'name': row.name, 'column_names': row.column_names}
+ for row in c.fetchall()
+ ]
+
def _load_enums(self, connection):
if not self.supports_native_enum:
return {}
cols.append(row[2])
return indexes
+ @reflection.cache
+ def get_unique_constraints(self, connection, table_name,
+ schema=None, **kw):
+ UNIQUE_SQL = """
+ SELECT sql
+ FROM
+ sqlite_master
+ WHERE
+ type='table' AND
+ name=:table_name
+ """
+ c = connection.execute(UNIQUE_SQL, table_name=table_name)
+ table_data = c.fetchone()[0]
+
+ UNIQUE_PATTERN = 'CONSTRAINT (\w+) UNIQUE \(([^\)]+)\)'
+ return [
+ {'name': name, 'column_names': [c.strip() for c in cols.split(',')]}
+ for name, cols in re.findall(UNIQUE_PATTERN, table_data)
+ ]
+
def _pragma_cursor(cursor):
"""work around SQLite issue whereby cursor.description
raise NotImplementedError()
+ def get_unique_constraints(self, table_name, schema=None, **kw):
+ """Return information about unique constraints in `table_name`.
+
+ Given a string `table_name` and an optional string `schema`, return
+ unique constraint information as a list of dicts with these keys:
+
+ name
+ the unique constraint's name
+
+ column_names
+ list of column names in order
+
+
+ """
+
+ raise NotImplementedError()
+
def normalize_name(self, name):
"""convert the given name to lowercase if it is detected as
case insensitive.
name
optional name of the foreign key constraint.
- \**kw
- other options passed to the dialect's get_foreign_keys() method.
-
"""
return self.dialect.get_foreign_keys(self.bind, table_name, schema,
unique
boolean
- \**kw
- other options passed to the dialect's get_indexes() method.
"""
return self.dialect.get_indexes(self.bind, table_name,
schema,
info_cache=self.info_cache, **kw)
+ def get_unique_constraints(self, table_name, schema=None, **kw):
+ """Return information about unique constraints in `table_name`.
+
+ Given a string `table_name` and an optional string `schema`, return
+ unique constraint information as a list of dicts with these keys:
+
+ name
+ the unique constraint's name
+
+ column_names
+ list of column names in order
+
+ .. versionadded:: 0.8.4
+
+ """
+
+ return self.dialect.get_unique_constraints(
+ self.bind, table_name, schema, info_cache=self.info_cache, **kw)
+
def reflecttable(self, table, include_columns, exclude_columns=()):
"""Given a Table object, load its internal constructs based on
introspection.
+import operator
+
import unicodedata
import sqlalchemy as sa
from sqlalchemy import schema, events, event, inspect
assert set([t2.c.name, t2.c.id]) == set(r2.columns)
assert set([t2.c.name]) == set(r3.columns)
+ @testing.provide_metadata
+ def test_unique_constraints_reflection(self):
+ uniques = sorted(
+ [
+ {'name': 'unique_a_b_c', 'column_names': ['a', 'b', 'c']},
+ {'name': 'unique_a_c', 'column_names': ['a', 'c']},
+ {'name': 'unique_b_c', 'column_names': ['b', 'c']},
+ ],
+ key=operator.itemgetter('name')
+ )
+
+ try:
+ orig_meta = sa.MetaData(bind=testing.db)
+ table = Table(
+ 'testtbl', orig_meta,
+ Column('a', sa.String(20)),
+ Column('b', sa.String(30)),
+ Column('c', sa.Integer),
+ )
+ for uc in uniques:
+ table.append_constraint(
+ sa.UniqueConstraint(*uc['column_names'], name=uc['name'])
+ )
+ orig_meta.create_all()
+
+ inspector = inspect(testing.db)
+ reflected = sorted(
+ inspector.get_unique_constraints('testtbl'),
+ key=operator.itemgetter('name')
+ )
+
+ assert uniques == reflected
+ finally:
+ testing.db.execute('drop table if exists testtbl;')
+
@testing.requires.views
@testing.provide_metadata
def test_views(self):