# table attributes we might need.
reflection_options = dict(
- (k, table.kwargs.get(k)) for k in dialect.reflection_options if k in table.kwargs)
+ (k, table.kwargs.get(k))
+ for k in dialect.reflection_options if k in table.kwargs)
schema = table.schema
table_name = table.name
# columns
found_table = False
+ cols_by_orig_name = {}
+
for col_d in self.get_columns(table_name, schema, **tblkw):
found_table = True
+ orig_name = col_d['name']
+
table.dispatch.column_reflect(self, table, col_d)
name = col_d['name']
sequence.increment = seq['increment']
colargs.append(sequence)
- col = sa_schema.Column(name, coltype, *colargs, **col_kw)
+ cols_by_orig_name[orig_name] = col = \
+ sa_schema.Column(name, coltype, *colargs, **col_kw)
+
table.append_column(col)
if not found_table:
pk_cons = self.get_pk_constraint(table_name, schema, **tblkw)
if pk_cons:
pk_cols = [
- table.c[pk]
+ cols_by_orig_name[pk]
for pk in pk_cons['constrained_columns']
- if pk in table.c and pk not in exclude_columns
+ if pk in cols_by_orig_name and pk not in exclude_columns
]
pk_cols += [
pk
fkeys = self.get_foreign_keys(table_name, schema, **tblkw)
for fkey_d in fkeys:
conname = fkey_d['name']
- constrained_columns = fkey_d['constrained_columns']
+ constrained_columns = [
+ cols_by_orig_name[c].key
+ if c in cols_by_orig_name else c
+ for c in fkey_d['constrained_columns']
+ ]
if exclude_columns and set(constrained_columns).intersection(
exclude_columns):
continue
"Omitting %s KEY for (%s), key covers omitted columns." %
(flavor, ', '.join(columns)))
continue
- sa_schema.Index(name, *[table.columns[c] for c in columns],
+ sa_schema.Index(name, *[cols_by_orig_name[c] for c in columns],
**dict(unique=unique))
cls.metadata,
Column('x', sa.Integer, primary_key=True),
)
+ cls.related = Table(
+ 'related',
+ cls.metadata,
+ Column('q', sa.Integer, sa.ForeignKey('to_reflect.x'))
+ )
+ sa.Index("some_index", cls.to_reflect.c.x)
cls.metadata.create_all(testing.db)
@classmethod
def teardown(self):
events.SchemaEventTarget.dispatch._clear()
- def _do_test(self, col, update, assert_):
+ def _do_test(self, col, update, assert_, tablename="to_reflect"):
# load the actual Table class, not the test
# wrapper
from sqlalchemy.schema import Table
if column_info['name'] == col:
column_info.update(update)
- t = Table('to_reflect', m, autoload=True, listeners=[
+ t = Table(tablename, m, autoload=True, listeners=[
('column_reflect', column_reflect),
])
assert_(t)
m = MetaData(testing.db)
event.listen(Table, 'column_reflect', column_reflect)
- t2 = Table('to_reflect', m, autoload=True)
+ t2 = Table(tablename, m, autoload=True)
assert_(t2)
def test_override_key(self):
+ def assertions(table):
+ eq_(table.c.YXZ.name, "x")
+ eq_(set(table.primary_key), set([table.c.YXZ]))
+ idx = list(table.indexes)[0]
+ eq_(idx.columns, [table.c.YXZ])
+
self._do_test(
"x", {"key": "YXZ"},
- lambda table: eq_(table.c.YXZ.name, "x")
+ assertions
)
+ def test_override_key_fk(self):
+ m = MetaData(testing.db)
+ def column_reflect(insp, table, column_info):
+
+ if column_info['name'] == 'q':
+ column_info['key'] = 'qyz'
+ elif column_info['name'] == 'x':
+ column_info['key'] = 'xyz'
+
+ to_reflect = Table("to_reflect", m, autoload=True, listeners=[
+ ('column_reflect', column_reflect),
+ ])
+ related = Table("related", m, autoload=True, listeners=[
+ ('column_reflect', column_reflect),
+ ])
+
+ assert related.c.qyz.references(to_reflect.c.xyz)
+
def test_override_type(self):
def assert_(table):
assert isinstance(table.c.x.type, sa.String)