- Added ROWID type to the Oracle dialect, for those
cases where an explicit CAST might be needed.
[ticket:1879]
+
+ - Oracle reflection of indexes has been tuned so
+ that indexes which include some or all primary
+ key columns, but not the same set of columns
+ as that of the primary key, are reflected.
+ Indexes which contain the identical columns
+ as that of the primary key are skipped within
+ reflection, as the index in that case is assumed
+ to be the auto-generated primary key index.
+ Previously, any index with PK columns present
+ would be skipped. Thanks to Kent Bower
+ for the patch. [ticket:1867]
+
+ - Oracle now reflects the names of primary key
+ constraints - also thanks to Kent Bower.
+ [ticket:1868]
- examples
- The beaker_caching example has been reorgnized
uniqueness = dict(NONUNIQUE=False, UNIQUE=True)
oracle_sys_col = re.compile(r'SYS_NC\d+\$', re.IGNORECASE)
+
+ def upper_name_set(names):
+ return set([i.upper() for i in names])
+
+ pk_names = upper_name_set(pkeys)
+
+ def remove_if_primary_key(index):
+ # don't include the primary key index
+ if index is not None and \
+ upper_name_set(index['column_names']) == pk_names:
+ indexes.pop()
+
+ index = None
for rset in rp:
- # don't include the primary key columns
- if rset.column_name in [s.upper() for s in pkeys]:
- continue
if rset.index_name != last_index_name:
+ remove_if_primary_key(index)
index = dict(name=self.normalize_name(rset.index_name), column_names=[])
indexes.append(index)
index['unique'] = uniqueness.get(rset.uniqueness, False)
if not oracle_sys_col.match(rset.column_name):
index['column_names'].append(self.normalize_name(rset.column_name))
last_index_name = rset.index_name
+ remove_if_primary_key(index)
return indexes
@reflection.cache
constraint_data = rp.fetchall()
return constraint_data
- @reflection.cache
def get_primary_keys(self, connection, table_name, schema=None, **kw):
"""
dblink
"""
+ return self._get_primary_keys(connection, table_name, schema, **kw)[0]
+ @reflection.cache
+ def _get_primary_keys(self, connection, table_name, schema=None, **kw):
resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
dblink = kw.get('dblink', '')
info_cache = kw.get('info_cache')
resolve_synonyms, dblink,
info_cache=info_cache)
pkeys = []
+ constraint_name = None
constraint_data = self._get_constraint_data(connection, table_name,
schema, dblink,
info_cache=kw.get('info_cache'))
(cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = \
row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
if cons_type == 'P':
+ if constraint_name is None:
+ constraint_name = self.normalize_name(cons_name)
pkeys.append(local_column)
- return pkeys
+ return pkeys, constraint_name
+
+ def get_pk_constraint(self, connection, table_name, schema=None, **kw):
+ cols, name = self._get_primary_keys(connection, table_name, schema=schema, **kw)
+
+ return {
+ 'constrained_columns':cols,
+ 'name':name
+ }
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
"""Target driver reflects the name of primary key constraints."""
return _chain_decorators_on(
fn,
- fails_on_everything_except('postgresql')
+ fails_on_everything_except('postgresql', 'oracle')
)
def python2(fn):
'TEST_INDEX_REFLECT (UPPER(DATA))')
m2 = MetaData(testing.db)
t2 = Table('test_index_reflect', m2, autoload=True)
-
+
+class RoundTripIndexTest(TestBase):
+ __only_on__ = 'oracle'
+
+ def test_basic(self):
+ engine = testing.db
+ metadata = MetaData(engine)
+
+ table=Table("sometable", metadata,
+ Column("id_a", Unicode(255), primary_key=True),
+ Column("id_b", Unicode(255), primary_key=True, unique=True),
+ Column("group", Unicode(255), primary_key=True),
+ Column("col", Unicode(255)),
+ UniqueConstraint('col','group'),
+ )
+
+ # "group" is a keyword, so lower case
+ normalind = Index('tableind', table.c.id_b, table.c.group)
+
+ # create
+ metadata.create_all()
+ try:
+ # round trip, create from reflection
+ mirror = MetaData(engine)
+ mirror.reflect()
+ metadata.drop_all()
+ mirror.create_all()
+
+ # inspect the reflected creation
+ inspect = MetaData(engine)
+ inspect.reflect()
+
+ def obj_definition(obj):
+ return obj.__class__, tuple([c.name for c in
+ obj.columns]), getattr(obj, 'unique', None)
+
+ # find what the primary k constraint name should be
+ primaryconsname = engine.execute(
+ text("""SELECT constraint_name
+ FROM all_constraints
+ WHERE table_name = :table_name
+ AND owner = :owner
+ AND constraint_type = 'P' """),
+ table_name=table.name.upper(),
+ owner=engine.url.username.upper()).fetchall()[0][0]
+
+ reflectedtable = inspect.tables[table.name]
+
+ # make a dictionary of the reflected objects:
+
+ reflected = dict([(obj_definition(i), i) for i in
+ reflectedtable.indexes
+ | reflectedtable.constraints])
+
+ # assert we got primary key constraint and its name, Error
+ # if not in dict
+
+ assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b',
+ 'group'), None)].name.upper() \
+ == primaryconsname.upper()
+
+ # Error if not in dict
+
+ assert reflected[(Index, ('id_b', 'group'), False)].name \
+ == normalind.name
+ assert (Index, ('id_b', ), True) in reflected
+ assert (Index, ('col', 'group'), True) in reflected
+ assert len(reflectedtable.constraints) == 1
+ assert len(reflectedtable.indexes) == 3
+
+ finally:
+ metadata.drop_all()
+
+
class SequenceTest(TestBase, AssertsCompiledSQL):