From 36fa24603f20ff6afc537cc97310d90efc667959 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sun, 29 Aug 2010 17:06:05 -0400 Subject: [PATCH] - Oracle reflection of indexes has been tuned so that indexes which include some or all primary key columns, but not the same set of columns as that of the primary key, are reflected. Indexes which contain the identical columns as that of the primary key are skipped within reflection, as the index in that case is assumed to be the auto-generated primary key index. Previously, any index with PK columns present would be skipped. Thanks to Kent Bower for the patch. [ticket:1867] - Oracle now reflects the names of primary key constraints - also thanks to Kent Bower. [ticket:1868] --- CHANGES | 16 ++++++ lib/sqlalchemy/dialects/oracle/base.py | 35 ++++++++++-- lib/sqlalchemy/test/requires.py | 2 +- test/dialect/test_oracle.py | 75 +++++++++++++++++++++++++- 4 files changed, 121 insertions(+), 7 deletions(-) diff --git a/CHANGES b/CHANGES index 056f695199..98f37a3a62 100644 --- a/CHANGES +++ b/CHANGES @@ -226,6 +226,22 @@ CHANGES - Added ROWID type to the Oracle dialect, for those cases where an explicit CAST might be needed. [ticket:1879] + + - Oracle reflection of indexes has been tuned so + that indexes which include some or all primary + key columns, but not the same set of columns + as that of the primary key, are reflected. + Indexes which contain the identical columns + as that of the primary key are skipped within + reflection, as the index in that case is assumed + to be the auto-generated primary key index. + Previously, any index with PK columns present + would be skipped. Thanks to Kent Bower + for the patch. [ticket:1867] + + - Oracle now reflects the names of primary key + constraints - also thanks to Kent Bower. + [ticket:1868] - examples - The beaker_caching example has been reorgnized diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py index 4c153dac29..0aa348953a 100644 --- a/lib/sqlalchemy/dialects/oracle/base.py +++ b/lib/sqlalchemy/dialects/oracle/base.py @@ -899,11 +899,22 @@ class OracleDialect(default.DefaultDialect): uniqueness = dict(NONUNIQUE=False, UNIQUE=True) oracle_sys_col = re.compile(r'SYS_NC\d+\$', re.IGNORECASE) + + def upper_name_set(names): + return set([i.upper() for i in names]) + + pk_names = upper_name_set(pkeys) + + def remove_if_primary_key(index): + # don't include the primary key index + if index is not None and \ + upper_name_set(index['column_names']) == pk_names: + indexes.pop() + + index = None for rset in rp: - # don't include the primary key columns - if rset.column_name in [s.upper() for s in pkeys]: - continue if rset.index_name != last_index_name: + remove_if_primary_key(index) index = dict(name=self.normalize_name(rset.index_name), column_names=[]) indexes.append(index) index['unique'] = uniqueness.get(rset.uniqueness, False) @@ -913,6 +924,7 @@ class OracleDialect(default.DefaultDialect): if not oracle_sys_col.match(rset.column_name): index['column_names'].append(self.normalize_name(rset.column_name)) last_index_name = rset.index_name + remove_if_primary_key(index) return indexes @reflection.cache @@ -945,7 +957,6 @@ class OracleDialect(default.DefaultDialect): constraint_data = rp.fetchall() return constraint_data - @reflection.cache def get_primary_keys(self, connection, table_name, schema=None, **kw): """ @@ -956,7 +967,10 @@ class OracleDialect(default.DefaultDialect): dblink """ + return self._get_primary_keys(connection, table_name, schema, **kw)[0] + @reflection.cache + def _get_primary_keys(self, connection, table_name, schema=None, **kw): resolve_synonyms = kw.get('oracle_resolve_synonyms', False) dblink = kw.get('dblink', '') info_cache = kw.get('info_cache') @@ -966,6 +980,7 @@ class OracleDialect(default.DefaultDialect): resolve_synonyms, dblink, info_cache=info_cache) pkeys = [] + constraint_name = None constraint_data = self._get_constraint_data(connection, table_name, schema, dblink, info_cache=kw.get('info_cache')) @@ -975,8 +990,18 @@ class OracleDialect(default.DefaultDialect): (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = \ row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]]) if cons_type == 'P': + if constraint_name is None: + constraint_name = self.normalize_name(cons_name) pkeys.append(local_column) - return pkeys + return pkeys, constraint_name + + def get_pk_constraint(self, connection, table_name, schema=None, **kw): + cols, name = self._get_primary_keys(connection, table_name, schema=schema, **kw) + + return { + 'constrained_columns':cols, + 'name':name + } @reflection.cache def get_foreign_keys(self, connection, table_name, schema=None, **kw): diff --git a/lib/sqlalchemy/test/requires.py b/lib/sqlalchemy/test/requires.py index fefb00330a..501f0e24d9 100644 --- a/lib/sqlalchemy/test/requires.py +++ b/lib/sqlalchemy/test/requires.py @@ -257,7 +257,7 @@ def reflects_pk_names(fn): """Target driver reflects the name of primary key constraints.""" return _chain_decorators_on( fn, - fails_on_everything_except('postgresql') + fails_on_everything_except('postgresql', 'oracle') ) def python2(fn): diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py index 384066c41f..29d18b988c 100644 --- a/test/dialect/test_oracle.py +++ b/test/dialect/test_oracle.py @@ -1121,7 +1121,80 @@ class UnsupportedIndexReflectTest(TestBase): 'TEST_INDEX_REFLECT (UPPER(DATA))') m2 = MetaData(testing.db) t2 = Table('test_index_reflect', m2, autoload=True) - + +class RoundTripIndexTest(TestBase): + __only_on__ = 'oracle' + + def test_basic(self): + engine = testing.db + metadata = MetaData(engine) + + table=Table("sometable", metadata, + Column("id_a", Unicode(255), primary_key=True), + Column("id_b", Unicode(255), primary_key=True, unique=True), + Column("group", Unicode(255), primary_key=True), + Column("col", Unicode(255)), + UniqueConstraint('col','group'), + ) + + # "group" is a keyword, so lower case + normalind = Index('tableind', table.c.id_b, table.c.group) + + # create + metadata.create_all() + try: + # round trip, create from reflection + mirror = MetaData(engine) + mirror.reflect() + metadata.drop_all() + mirror.create_all() + + # inspect the reflected creation + inspect = MetaData(engine) + inspect.reflect() + + def obj_definition(obj): + return obj.__class__, tuple([c.name for c in + obj.columns]), getattr(obj, 'unique', None) + + # find what the primary k constraint name should be + primaryconsname = engine.execute( + text("""SELECT constraint_name + FROM all_constraints + WHERE table_name = :table_name + AND owner = :owner + AND constraint_type = 'P' """), + table_name=table.name.upper(), + owner=engine.url.username.upper()).fetchall()[0][0] + + reflectedtable = inspect.tables[table.name] + + # make a dictionary of the reflected objects: + + reflected = dict([(obj_definition(i), i) for i in + reflectedtable.indexes + | reflectedtable.constraints]) + + # assert we got primary key constraint and its name, Error + # if not in dict + + assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b', + 'group'), None)].name.upper() \ + == primaryconsname.upper() + + # Error if not in dict + + assert reflected[(Index, ('id_b', 'group'), False)].name \ + == normalind.name + assert (Index, ('id_b', ), True) in reflected + assert (Index, ('col', 'group'), True) in reflected + assert len(reflectedtable.constraints) == 1 + assert len(reflectedtable.indexes) == 3 + + finally: + metadata.drop_all() + + class SequenceTest(TestBase, AssertsCompiledSQL): -- 2.47.2