from sqlalchemy import exc
from sqlalchemy.sql import table, column, null
from sqlalchemy import util
-from sqlalchemy.testing import fixtures
metadata = MetaData()
table1 = Table('table1', metadata,
table2.c.coly]))
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
- c = u.corresponding_column(s1.c.table1_col2)
+
assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
metadata = MetaData()
a = Table('a', metadata,
Column('id', Integer, primary_key=True))
- b = Table('b', metadata,
- Column('id', Integer, primary_key=True),
- Column('aid', Integer, ForeignKey('a.id')),
- )
- j1 = a.outerjoin(b)
j2 = select([a.c.id.label('aid')]).alias('bar')
- j3 = a.join(j2, j2.c.aid==a.c.id)
+ j3 = a.join(j2, j2.c.aid == a.c.id)
j4 = select([j3]).alias('foo')
assert j4.corresponding_column(j2.c.aid) is j4.c.aid
def test_unusual_column_elements_boolean_clauselist(self):
"""test that BooleanClauseList is placed as single element in .c."""
- c2 = and_(table1.c.col2==5, table1.c.col3==4)
+ c2 = and_(table1.c.col2 == 5, table1.c.col3 == 4)
s = select([table1.c.col1, c2])
eq_(
list(s.c),
c1 = Column('c1', Integer)
c2 = Column('c2', Integer)
- s = select([c1]).where(c1==5)
+ s = select([c1]).where(c1 == 5)
t = Table('t', MetaData(), c1, c2)
t1 = Table('t1', m, Column('x', Integer))
c1 = Column('c1', Integer)
- s = select([c1]).where(c1==5).select_from(t1)
+ s = select([c1]).where(c1 == 5).select_from(t1)
t2 = Table('t2', MetaData(), c1)
als = t2t3.alias()
# test join's behavior, including natural
for left, right, expected in [
- (t1, t2, t1.c.id==t2.c.t1id),
- (t1t2, t3, t1t2.c.t2_id==t3.c.t2id),
- (t2t3, t1, t1.c.id==t3.c.t1id),
- (t2t3, t4, t2t3.c.t2_id==t4.c.t2id),
- (t2t3, t4, t2t3.c.t2_id==t4.c.t2id),
- (t2t3.join(t1), t4, t2t3.c.t2_id==t4.c.t2id),
- (t2t3.join(t1), t4, t2t3.c.t2_id==t4.c.t2id),
- (t1t2, als, t1t2.c.t2_id==als.c.t3_t2id)
+ (t1, t2, t1.c.id == t2.c.t1id),
+ (t1t2, t3, t1t2.c.t2_id == t3.c.t2id),
+ (t2t3, t1, t1.c.id == t3.c.t1id),
+ (t2t3, t4, t2t3.c.t2_id == t4.c.t2id),
+ (t2t3, t4, t2t3.c.t2_id == t4.c.t2id),
+ (t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id),
+ (t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id),
+ (t1t2, als, t1t2.c.t2_id == als.c.t3_t2id)
]:
assert expected.compare(
left.join(right).onclause
Column('q', Integer, ForeignKey('t22.id')),
)
t2 = Table('t2', m, Column('id', Integer))
- assert sql_util.join_condition(t1, t2).compare(t1.c.x==t2.c.id)
- assert sql_util.join_condition(t2, t1).compare(t1.c.x==t2.c.id)
+ assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id)
+ assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id)
def test_join_cond_no_such_unrelated_column(self):
m = MetaData()
t1 = Table('t1', m, Column('x', Integer, ForeignKey('t2.id')),
Column('y', Integer, ForeignKey('t3.q')))
t2 = Table('t2', m, Column('id', Integer))
- t3 = Table('t3', m, Column('id', Integer))
- assert sql_util.join_condition(t1, t2).compare(t1.c.x==t2.c.id)
- assert sql_util.join_condition(t2, t1).compare(t1.c.x==t2.c.id)
+ Table('t3', m, Column('id', Integer))
+ assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id)
+ assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id)
def test_join_cond_no_such_related_table(self):
m1 = MetaData()
t2 = Table('t2', m, Column('id', Integer))
assert_raises_message(
exc.NoReferencedColumnError,
- "Could not create ForeignKey 't2.q' on table 't1': table 't2' has no column named 'q'",
+ "Could not create ForeignKey 't2.q' on table 't1': "
+ "table 't2' has no column named 'q'",
sql_util.join_condition, t1, t2
)
assert_raises_message(
exc.NoReferencedColumnError,
- "Could not create ForeignKey 't2.q' on table 't1': table 't2' has no column named 'q'",
+ "Could not create ForeignKey 't2.q' on table 't1': "
+ "table 't2' has no column named 'q'",
sql_util.join_condition, t2, t1
)
Column('id', Integer, ForeignKey('a.id'), primary_key=True),
Column('x', Integer, primary_key=True))
- j = a.join(b, and_(a.c.id==b.c.id, b.c.x==5))
+ j = a.join(b, and_(a.c.id == b.c.id, b.c.x == 5))
assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :x_1", str(j)
assert list(j.primary_key) == [a.c.id, b.c.x]
def test_onclause_direction(self):
metadata = MetaData()
- employee = Table( 'Employee', metadata,
+ employee = Table('Employee', metadata,
Column('name', String(100)),
- Column('id', Integer, primary_key= True),
+ Column('id', Integer, primary_key=True),
)
engineer = Table('Engineer', metadata,
def visit_binary(b):
b.right = table1.c.col2
- b2 = visitors.cloned_traverse(bin, {}, {'binary':visit_binary})
+ b2 = visitors.cloned_traverse(bin, {}, {'binary': visit_binary})
assert str(b2) == "table1.col1 = table1.col2"
- b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary'
- : visit_binary})
+ b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary':
+ visit_binary})
assert str(b3) == 'table1.col1 = table1.col2'
def visit_binary(b):
b.left = bindparam('bar')
- b4 = visitors.cloned_traverse(b2, {}, {'binary':visit_binary})
+ b4 = visitors.cloned_traverse(b2, {}, {'binary': visit_binary})
assert str(b4) == ":bar = table1.col2"
- b5 = visitors.cloned_traverse(b3, {}, {'binary':visit_binary})
+ b5 = visitors.cloned_traverse(b3, {}, {'binary': visit_binary})
assert str(b5) == ":bar = table1.col2"
def test_annotate_aliased(self):
bin = table1.c.col1 == bindparam('foo', value=None)
- b2 = sql_util._deep_annotate(bin, {'_orm_adapt':True})
+ b2 = sql_util._deep_annotate(bin, {'_orm_adapt': True})
b3 = sql_util._deep_deannotate(b2)
b4 = sql_util._deep_deannotate(bin)
table2 = table('table2', column('y'))
a1 = table1.alias()
s = select([a1.c.x]).select_from(
- a1.join(table2, a1.c.x==table2.c.y)
+ a1.join(table2, a1.c.x == table2.c.y)
)
for sel in (
sql_util._deep_deannotate(s),
visitors.cloned_traverse(s, {}, {}),
- visitors.replacement_traverse(s, {}, lambda x:None)
+ visitors.replacement_traverse(s, {}, lambda x: None)
):
# the columns clause isn't changed at all
assert sel._raw_columns[0].table is a1
# partially, each element is copied unconditionally
# when encountered.
for sel in (
- sql_util._deep_deannotate(s, {"foo":"bar"}),
- sql_util._deep_annotate(s, {'foo':'bar'}),
+ sql_util._deep_deannotate(s, {"foo": "bar"}),
+ sql_util._deep_annotate(s, {'foo': 'bar'}),
):
assert sel._froms[0] is not sel._froms[1].left
s = select([t1.c.col1._annotate({"foo":"bar"})])
s2 = select([t1.c.col1._annotate({"bat":"hoho"})])
s3 = s.union(s2)
- sel = sql_util._deep_annotate(s3, {"new":"thing"})
+ sel = sql_util._deep_annotate(s3, {"new": "thing"})
eq_(
sel.selects[0]._raw_columns[0]._annotations,
- {"foo":"bar", "new":"thing"}
+ {"foo": "bar", "new": "thing"}
)
eq_(
sel.selects[1]._raw_columns[0]._annotations,
- {"bat":"hoho", "new":"thing"}
+ {"bat": "hoho", "new": "thing"}
)
def test_deannotate_2(self):
table1 = table('table1', column("col1"), column("col2"))
- j = table1.c.col1._annotate({"remote":True}) == \
- table1.c.col2._annotate({"local":True})
+ j = table1.c.col1._annotate({"remote": True}) == \
+ table1.c.col2._annotate({"local": True})
j2 = sql_util._deep_deannotate(j)
eq_(
- j.left._annotations, {"remote":True}
+ j.left._annotations, {"remote": True}
)
eq_(
j2.left._annotations, {}
table1 = table('table1', column("col1"), column("col2"),
column("col3"), column("col4"))
j = and_(
- table1.c.col1._annotate({"remote":True})==
- table1.c.col2._annotate({"local":True}),
- table1.c.col3._annotate({"remote":True})==
- table1.c.col4._annotate({"local":True})
+ table1.c.col1._annotate({"remote": True}) ==
+ table1.c.col2._annotate({"local": True}),
+ table1.c.col3._annotate({"remote": True}) ==
+ table1.c.col4._annotate({"local": True})
)
j2 = sql_util._deep_deannotate(j)
eq_(
- j.clauses[0].left._annotations, {"remote":True}
+ j.clauses[0].left._annotations, {"remote": True}
)
eq_(
j2.clauses[0].left._annotations, {}
table2 = table('table2', column('y'))
a1 = table1.alias()
s = select([a1.c.x]).select_from(
- a1.join(table2, a1.c.x==table2.c.y)
+ a1.join(table2, a1.c.x == table2.c.y)
)
assert_s = select([select([s])])
for fn in (
sql_util._deep_deannotate,
- lambda s: sql_util._deep_annotate(s, {'foo':'bar'}),
- lambda s:visitors.cloned_traverse(s, {}, {}),
- lambda s:visitors.replacement_traverse(s, {}, lambda x:None)
+ lambda s: sql_util._deep_annotate(s, {'foo': 'bar'}),
+ lambda s: visitors.cloned_traverse(s, {}, {}),
+ lambda s: visitors.replacement_traverse(s, {}, lambda x: None)
):
sel = fn(select([fn(select([fn(s)]))]))
def test_bind_unique_test(self):
- t1 = table('t', column('a'), column('b'))
+ table('t', column('a'), column('b'))
b = bindparam("bind", value="x", unique=True)
# the annotation of "b" should render the
# same. The "unique" test in compiler should
# also pass, [ticket:2425]
- eq_(str(or_(b, b._annotate({"foo":"bar"}))),
+ eq_(str(or_(b, b._annotate({"foo": "bar"}))),
":bind_1 OR :bind_1")