oracle_sys_col = re.compile(r'SYS_NC\d+\$', re.IGNORECASE)
- def upper_name_set(names):
- return {i.upper() for i in names}
-
- pk_names = upper_name_set(pkeys)
-
- def remove_if_primary_key(index):
- # don't include the primary key index
- if index is not None and \
- upper_name_set(index['column_names']) == pk_names:
- indexes.pop()
-
index = None
for rset in rp:
if rset.index_name != last_index_name:
- remove_if_primary_key(index)
index = dict(name=self.normalize_name(rset.index_name),
column_names=[], dialect_options={})
indexes.append(index)
index['column_names'].append(
self.normalize_name(rset.column_name))
last_index_name = rset.index_name
- remove_if_primary_key(index)
+
+ def upper_name_set(names):
+ return {i.upper() for i in names}
+
+ pk_names = upper_name_set(pkeys)
+ if pk_names:
+ def is_pk_index(index):
+ # don't include the primary key index
+ return upper_name_set(index['column_names']) == pk_names
+ indexes = [idx for idx in indexes if not is_pk_index(idx)]
+
return indexes
@reflection.cache
from sqlalchemy import exc as sa_exc
from sqlalchemy import types as sql_types
from sqlalchemy import inspect
-from sqlalchemy import MetaData, Integer, String
+from sqlalchemy import MetaData, Integer, String, func
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.testing import engines, fixtures
from sqlalchemy.testing.schema import Table, Column
-from sqlalchemy.testing import eq_, assert_raises_message
+from sqlalchemy.testing import eq_, is_, assert_raises_message
from sqlalchemy import testing
from .. import config
import operator
if testing.requires.index_reflection.enabled:
cls.define_index(metadata, users)
+
+ if not schema:
+ noncol_idx_test_nopk = Table(
+ 'noncol_idx_test_nopk', metadata,
+ Column('q', sa.String(5)),
+ )
+ noncol_idx_test_pk = Table(
+ 'noncol_idx_test_pk', metadata,
+ Column('id', sa.Integer, primary_key=True),
+ Column('q', sa.String(5)),
+ )
+ Index('noncol_idx_nopk', noncol_idx_test_nopk.c.q.desc())
+ Index('noncol_idx_pk', noncol_idx_test_pk.c.q.desc())
+
if testing.requires.view_column_reflection.enabled:
cls.define_views(metadata, schema)
if not schema and testing.requires.temp_table_reflection.enabled:
@testing.provide_metadata
def _test_get_table_names(self, schema=None, table_type='table',
order_by=None):
+ _ignore_tables = [
+ 'comment_test', 'noncol_idx_test_pk', 'noncol_idx_test_nopk'
+ ]
meta = self.metadata
users, addresses, dingalings = self.tables.users, \
self.tables.email_addresses, self.tables.dingalings
table_names = [
t for t in insp.get_table_names(
schema,
- order_by=order_by) if t not in ('comment_test', )]
+ order_by=order_by) if t not in _ignore_tables]
if order_by == 'foreign_key':
answer = ['users', 'email_addresses', 'dingalings']
{'onupdate': 'SET NULL', 'ondelete': 'CASCADE'}
)
+ def _assert_insp_indexes(self, indexes, expected_indexes):
+ index_names = [d['name'] for d in indexes]
+ for e_index in expected_indexes:
+ assert e_index['name'] in index_names
+ index = indexes[index_names.index(e_index['name'])]
+ for key in e_index:
+ eq_(e_index[key], index[key])
+
@testing.provide_metadata
def _test_get_indexes(self, schema=None):
meta = self.metadata
'column_names': ['user_id', 'test2', 'test1'],
'name': 'users_all_idx'}
]
- index_names = [d['name'] for d in indexes]
- for e_index in expected_indexes:
- assert e_index['name'] in index_names
- index = indexes[index_names.index(e_index['name'])]
- for key in e_index:
- eq_(e_index[key], index[key])
+ self._assert_insp_indexes(indexes, expected_indexes)
@testing.requires.index_reflection
def test_get_indexes(self):
def test_get_indexes_with_schema(self):
self._test_get_indexes(schema=testing.config.test_schema)
+ @testing.provide_metadata
+ def _test_get_noncol_index(self, tname, ixname):
+ meta = self.metadata
+ insp = inspect(meta.bind)
+ indexes = insp.get_indexes(tname)
+
+ # reflecting an index that has "x DESC" in it as the column.
+ # the DB may or may not give us "x", but make sure we get the index
+ # back, it has a name, it's connected to the table.
+ expected_indexes = [
+ {'unique': False,
+ 'name': ixname}
+ ]
+ self._assert_insp_indexes(indexes, expected_indexes)
+
+ t = Table(tname, meta, autoload_with=meta.bind)
+ eq_(len(t.indexes), 1)
+ is_(list(t.indexes)[0].table, t)
+ eq_(list(t.indexes)[0].name, ixname)
+
+ @testing.requires.index_reflection
+ def test_get_noncol_index_no_pk(self):
+ self._test_get_noncol_index("noncol_idx_test_nopk", "noncol_idx_nopk")
+
+ @testing.requires.index_reflection
+ def test_get_noncol_index_pk(self):
+ self._test_get_noncol_index("noncol_idx_test_pk", "noncol_idx_pk")
+
@testing.requires.unique_constraint_reflection
def test_get_unique_constraints(self):
self._test_get_unique_constraints()
)
-class ColumnCollectionTest(fixtures.TestBase):
+class ColumnCollectionTest(testing.AssertsCompiledSQL, fixtures.TestBase):
def test_in(self):
cc = sql.ColumnCollection()
eq_(ci._all_columns, [c1, c2a, c3, c2b])
eq_(list(ci), [c1, c2b, c3])
+ def test_identical_dupe_add(self):
+ cc = sql.ColumnCollection()
+
+ c1, c2, c3 = (column('c1'),
+ column('c2'),
+ column('c3'))
+
+ cc.add(c1)
+ cc.add(c2)
+ cc.add(c3)
+ cc.add(c2)
+
+ eq_(cc._all_columns, [c1, c2, c3])
+
+ self.assert_compile(
+ cc == [c1, c2, c3],
+ "c1 = c1 AND c2 = c2 AND c3 = c3"
+ )
+
+ # for iter, c2a is replaced by c2b, ordering
+ # is maintained in that way. ideally, iter would be
+ # the same as the "_all_columns" collection.
+ eq_(list(cc), [c1, c2, c3])
+
+ assert cc.contains_column(c2)
+
+ ci = cc.as_immutable()
+ eq_(ci._all_columns, [c1, c2, c3])
+ eq_(list(ci), [c1, c2, c3])
+
+ self.assert_compile(
+ ci == [c1, c2, c3],
+ "c1 = c1 AND c2 = c2 AND c3 = c3"
+ )
+
def test_replace(self):
cc = sql.ColumnCollection()