def reflecttable(self, connection, table, include_columns):
import sqlalchemy.databases.information_schema as ischema
-
# Get base columns
if table.schema is not None:
current_schema = table.schema
fknm, scols, rcols = (None, [], [])
for r in rows:
scol, rschema, rtbl, rcol, rfknm, fkmatch, fkuprule, fkdelrule = r
- schema.Table(rtbl, table.metadata, schema=rschema, autoload=True, autoload_with=connection)
+ # if the reflected schema is the default schema then don't set it because this will
+ # play into the metadata key causing duplicates.
+ if rschema == current_schema:
+ schema.Table(rtbl, table.metadata, autoload=True, autoload_with=connection)
+ else:
+ schema.Table(rtbl, table.metadata, schema=rschema, autoload=True, autoload_with=connection)
if rfknm != fknm:
if fknm:
table.append_constraint(schema.ForeignKeyConstraint(scols, [_gen_fkref(table, s, t, c) for s, t, c in rcols], fknm))
self.assert_compile(t.update(inline=True, values={'col3':'foo'}), "UPDATE test SET col1=foo(:foo_1), col2=(SELECT coalesce(max(foo.id)) AS coalesce_1 FROM foo), col3=:col3")
class SchemaTest(TestBase, AssertsCompiledSQL):
- @testing.fails_on('mssql')
def test_select(self):
# these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables
self.assert_compile(table4.select(), "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id, remote_owner.remotetable.value FROM remote_owner.remotetable")