parent = Table('parent', meta, autoload=True, schema='ed')
child = Table('child', meta, autoload=True, schema='ed')
- self.assert_compile(parent.join(child), "ed.parent JOIN ed.child ON parent.id = child.parent_id")
+ self.assert_compile(parent.join(child), "ed.parent JOIN ed.child ON ed.parent.id = ed.child.parent_id")
select([parent, child]).select_from(parent.join(child)).execute().fetchall()
def test_reflect_local_to_remote(self):
meta = MetaData(testing.db)
lcl = Table('localtable', meta, autoload=True)
parent = meta.tables['ed.parent']
- self.assert_compile(parent.join(lcl), "ed.parent JOIN localtable ON parent.id = localtable.parent_id")
+ self.assert_compile(parent.join(lcl), "ed.parent JOIN localtable ON ed.parent.id = localtable.parent_id")
select([parent, lcl]).select_from(parent.join(lcl)).execute().fetchall()
finally:
testing.db.execute("DROP TABLE localtable")
parent = Table('parent', meta, autoload=True, schema='ed')
child = Table('child', meta, autoload=True, schema='ed')
- self.assert_compile(parent.join(child), "ed.parent JOIN ed.child ON parent.id = child.parent_id")
+ self.assert_compile(parent.join(child), "ed.parent JOIN ed.child ON ed.parent.id = ed.child.parent_id")
select([parent, child]).select_from(parent.join(child)).execute().fetchall()
def test_reflect_alt_owner_synonyms(self):
meta = MetaData(testing.db)
lcl = Table('localtable', meta, autoload=True, oracle_resolve_synonyms=True)
parent = meta.tables['ed.ptable']
- self.assert_compile(parent.join(lcl), "ed.ptable JOIN localtable ON ptable.id = localtable.parent_id")
+ self.assert_compile(parent.join(lcl), "ed.ptable JOIN localtable ON ed.ptable.id = localtable.parent_id")
select([parent, lcl]).select_from(parent.join(lcl)).execute().fetchall()
finally:
testing.db.execute("DROP TABLE localtable")
meta = MetaData(testing.db)
parent = Table('ptable', meta, autoload=True, schema='ed', oracle_resolve_synonyms=True)
child = Table('ctable', meta, autoload=True, schema='ed', oracle_resolve_synonyms=True)
- self.assert_compile(parent.join(child), "ed.ptable JOIN ed.ctable ON ptable.id = ctable.parent_id")
+ self.assert_compile(parent.join(child), "ed.ptable JOIN ed.ctable ON ed.ptable.id = ed.ctable.parent_id")
select([parent, child]).select_from(parent.join(child)).execute().fetchall()
def define_tables(self, metadata):
global nodes
nodes = Table('nodes', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('parent_id', Integer, ForeignKey('nodes.id')),
Column('data', String(30)))
def define_tables(self, metadata):
global nodes, node_to_nodes
nodes = Table('nodes', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(30)))
node_to_nodes =Table('node_to_nodes', metadata,
if fk.onupdate is None:
fk.onupdate = 'CASCADE'
+ if testing.against('oracle'):
+ pk_seqs = [col for col in args if isinstance(col, schema.Column)
+ and col.primary_key and getattr(col, '_needs_autoincrement', False)]
+ for c in pk_seqs:
+ c.args.append(schema.Sequence(args[0] + '_' + c.name + '_seq', optional=True))
return schema.Table(*args, **kw)
test_opts = dict([(k,kw.pop(k)) for k in kw.keys()
if k.startswith('test_')])
+ c = schema.Column(*args, **kw)
if testing.against('oracle'):
if 'test_needs_autoincrement' in test_opts:
- args = list(args)
- args.append(schema.Sequence(args[0] + "_seq", optional=True))
-
- return schema.Column(*args, **kw)
+ c._needs_autoincrement = True
+ return c