]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Oracle reflection of indexes has been tuned so
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 29 Aug 2010 21:06:05 +0000 (17:06 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 29 Aug 2010 21:06:05 +0000 (17:06 -0400)
that indexes which include some or all primary
key columns, but not the same set of columns
as that of the primary key, are reflected.
Indexes which contain the identical columns
as that of the primary key are skipped within
reflection, as the index in that case is assumed
to be the auto-generated primary key index.
Previously, any index with PK columns present
would be skipped.  Thanks to Kent Bower
for the patch.  [ticket:1867]

- Oracle now reflects the names of primary key
constraints - also thanks to Kent Bower.
[ticket:1868]

CHANGES
lib/sqlalchemy/dialects/oracle/base.py
lib/sqlalchemy/test/requires.py
test/dialect/test_oracle.py

diff --git a/CHANGES b/CHANGES
index 056f695199e069cf295d67d804855da98be5741a..98f37a3a62a0712a9ad507d8235cef373029f470 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -226,6 +226,22 @@ CHANGES
   - Added ROWID type to the Oracle dialect, for those
     cases where an explicit CAST might be needed.
     [ticket:1879]
+
+  - Oracle reflection of indexes has been tuned so
+    that indexes which include some or all primary
+    key columns, but not the same set of columns
+    as that of the primary key, are reflected.
+    Indexes which contain the identical columns
+    as that of the primary key are skipped within
+    reflection, as the index in that case is assumed
+    to be the auto-generated primary key index.
+    Previously, any index with PK columns present
+    would be skipped.  Thanks to Kent Bower
+    for the patch.  [ticket:1867]
+
+  - Oracle now reflects the names of primary key
+    constraints - also thanks to Kent Bower.
+    [ticket:1868]
     
 - examples
   - The beaker_caching example has been reorgnized
index 4c153dac29e3ab7b025c6c6150b7367f3a93ae84..0aa348953a10eb9fe49227d02154488207eda3cf 100644 (file)
@@ -899,11 +899,22 @@ class OracleDialect(default.DefaultDialect):
         uniqueness = dict(NONUNIQUE=False, UNIQUE=True)
         
         oracle_sys_col = re.compile(r'SYS_NC\d+\$', re.IGNORECASE)
+
+        def upper_name_set(names):
+            return set([i.upper() for i in names])
+
+        pk_names = upper_name_set(pkeys)
+
+        def remove_if_primary_key(index):
+            # don't include the primary key index
+            if index is not None and \
+               upper_name_set(index['column_names']) == pk_names:
+                indexes.pop()
+
+        index = None
         for rset in rp:
-            # don't include the primary key columns
-            if rset.column_name in [s.upper() for s in pkeys]:
-                continue
             if rset.index_name != last_index_name:
+                remove_if_primary_key(index)
                 index = dict(name=self.normalize_name(rset.index_name), column_names=[])
                 indexes.append(index)
             index['unique'] = uniqueness.get(rset.uniqueness, False)
@@ -913,6 +924,7 @@ class OracleDialect(default.DefaultDialect):
             if not oracle_sys_col.match(rset.column_name):
                 index['column_names'].append(self.normalize_name(rset.column_name))
             last_index_name = rset.index_name
+        remove_if_primary_key(index)
         return indexes
 
     @reflection.cache
@@ -945,7 +957,6 @@ class OracleDialect(default.DefaultDialect):
         constraint_data = rp.fetchall()
         return constraint_data
 
-    @reflection.cache
     def get_primary_keys(self, connection, table_name, schema=None, **kw):
         """
 
@@ -956,7 +967,10 @@ class OracleDialect(default.DefaultDialect):
             dblink
 
         """
+        return self._get_primary_keys(connection, table_name, schema, **kw)[0]
 
+    @reflection.cache
+    def _get_primary_keys(self, connection, table_name, schema=None, **kw):
         resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
         dblink = kw.get('dblink', '')
         info_cache = kw.get('info_cache')
@@ -966,6 +980,7 @@ class OracleDialect(default.DefaultDialect):
                                           resolve_synonyms, dblink,
                                           info_cache=info_cache)
         pkeys = []
+        constraint_name = None
         constraint_data = self._get_constraint_data(connection, table_name,
                                         schema, dblink,
                                         info_cache=kw.get('info_cache'))
@@ -975,8 +990,18 @@ class OracleDialect(default.DefaultDialect):
             (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = \
                 row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
             if cons_type == 'P':
+                if constraint_name is None:
+                    constraint_name = self.normalize_name(cons_name)
                 pkeys.append(local_column)
-        return pkeys
+        return pkeys, constraint_name
+
+    def get_pk_constraint(self, connection, table_name, schema=None, **kw):
+        cols, name = self._get_primary_keys(connection, table_name, schema=schema, **kw)
+
+        return {
+            'constrained_columns':cols,
+            'name':name
+        }
 
     @reflection.cache
     def get_foreign_keys(self, connection, table_name, schema=None, **kw):
index fefb00330abf56cdb0bc3ccb153551d11e56e452..501f0e24d9a7fdbbedcf570a1c94b0b8fa3dafa5 100644 (file)
@@ -257,7 +257,7 @@ def reflects_pk_names(fn):
     """Target driver reflects the name of primary key constraints."""
     return _chain_decorators_on(
         fn,
-        fails_on_everything_except('postgresql')
+        fails_on_everything_except('postgresql', 'oracle')
     )
     
 def python2(fn):
index 384066c41f626b4fc712c0435e39d252a3112019..29d18b988c6a8ac6c899f63f76c251488419491f 100644 (file)
@@ -1121,7 +1121,80 @@ class UnsupportedIndexReflectTest(TestBase):
                            'TEST_INDEX_REFLECT (UPPER(DATA))')
         m2 = MetaData(testing.db)
         t2 = Table('test_index_reflect', m2, autoload=True)
-        
+    
+class RoundTripIndexTest(TestBase):
+    __only_on__ = 'oracle'
+
+    def test_basic(self):
+        engine = testing.db
+        metadata = MetaData(engine)
+
+        table=Table("sometable", metadata,
+            Column("id_a", Unicode(255), primary_key=True),
+            Column("id_b", Unicode(255), primary_key=True, unique=True),
+            Column("group", Unicode(255), primary_key=True),
+            Column("col", Unicode(255)),
+            UniqueConstraint('col','group'),
+        )
+
+        # "group" is a keyword, so lower case
+        normalind = Index('tableind', table.c.id_b, table.c.group) 
+
+        # create
+        metadata.create_all()
+        try:
+            # round trip, create from reflection
+            mirror = MetaData(engine)
+            mirror.reflect()
+            metadata.drop_all()
+            mirror.create_all()
+
+            # inspect the reflected creation
+            inspect = MetaData(engine)
+            inspect.reflect()
+
+            def obj_definition(obj):
+                return obj.__class__, tuple([c.name for c in
+                        obj.columns]), getattr(obj, 'unique', None)
+
+            # find what the primary k constraint name should be
+            primaryconsname = engine.execute(
+               text("""SELECT constraint_name
+                   FROM all_constraints
+                   WHERE table_name = :table_name
+                   AND owner = :owner
+                   AND constraint_type = 'P' """),
+               table_name=table.name.upper(),
+               owner=engine.url.username.upper()).fetchall()[0][0]
+
+            reflectedtable = inspect.tables[table.name]
+
+            # make a dictionary of the reflected objects:
+
+            reflected = dict([(obj_definition(i), i) for i in
+                             reflectedtable.indexes
+                             | reflectedtable.constraints])
+
+            # assert we got primary key constraint and its name, Error
+            # if not in dict
+
+            assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b',
+                             'group'), None)].name.upper() \
+                == primaryconsname.upper()
+
+            # Error if not in dict
+
+            assert reflected[(Index, ('id_b', 'group'), False)].name \
+                == normalind.name
+            assert (Index, ('id_b', ), True) in reflected
+            assert (Index, ('col', 'group'), True) in reflected
+            assert len(reflectedtable.constraints) == 1
+            assert len(reflectedtable.indexes) == 3
+
+        finally:
+            metadata.drop_all()
+
+
         
 class SequenceTest(TestBase, AssertsCompiledSQL):