From: Jason Kirtland Date: Sat, 13 Oct 2007 20:35:42 +0000 (+0000) Subject: set svn:eol-style native X-Git-Tag: rel_0_4_0~29 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=224ff658e026fd5f9e77b871a31ac9c692ca1429;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git set svn:eol-style native --- diff --git a/doc/build/testdocs.py b/doc/build/testdocs.py index 0f5f693300..998320fb0d 100644 --- a/doc/build/testdocs.py +++ b/doc/build/testdocs.py @@ -1,71 +1,71 @@ -import sys -sys.path = ['../../lib', './lib/'] + sys.path - -import os -import re -import doctest -import sqlalchemy.util as util -import sqlalchemy.logging as salog -import logging - -salog.default_enabled=True -rootlogger = logging.getLogger('sqlalchemy') -rootlogger.setLevel(logging.NOTSET) -class MyStream(object): - def write(self, string): - sys.stdout.write(string) - sys.stdout.flush() - def flush(self): - pass -handler = logging.StreamHandler(MyStream()) -handler.setFormatter(logging.Formatter('%(message)s')) -rootlogger.addHandler(handler) - - -def teststring(s, name, globs=None, verbose=None, report=True, - optionflags=0, extraglobs=None, raise_on_error=False, - parser=doctest.DocTestParser()): - - from doctest import DebugRunner, DocTestRunner, master - - # Assemble the globals. - if globs is None: - globs = {} - else: - globs = globs.copy() - if extraglobs is not None: - globs.update(extraglobs) - - if raise_on_error: - runner = DebugRunner(verbose=verbose, optionflags=optionflags) - else: - runner = DocTestRunner(verbose=verbose, optionflags=optionflags) - - test = parser.get_doctest(s, globs, name, name, 0) - runner.run(test) - - if report: - runner.summarize() - - if master is None: - master = runner - else: - master.merge(runner) - - return runner.failures, runner.tries - -def replace_file(s, newfile): - engine = r"'(sqlite|postgres|mysql):///.*'" - engine = re.compile(engine, re.MULTILINE) - s, n = re.subn(engine, "'sqlite:///" + newfile + "'", s) - if not n: - raise ValueError("Couldn't find suitable create_engine call to replace '%s' in it" % oldfile) - return s - -for filename in ('ormtutorial', 'sqlexpression'): - filename = 'content/%s.txt' % filename - s = open(filename).read() - #s = replace_file(s, ':memory:') - s = re.sub(r'{(?:stop|sql|opensql)}', '', s) - teststring(s, filename) - +import sys +sys.path = ['../../lib', './lib/'] + sys.path + +import os +import re +import doctest +import sqlalchemy.util as util +import sqlalchemy.logging as salog +import logging + +salog.default_enabled=True +rootlogger = logging.getLogger('sqlalchemy') +rootlogger.setLevel(logging.NOTSET) +class MyStream(object): + def write(self, string): + sys.stdout.write(string) + sys.stdout.flush() + def flush(self): + pass +handler = logging.StreamHandler(MyStream()) +handler.setFormatter(logging.Formatter('%(message)s')) +rootlogger.addHandler(handler) + + +def teststring(s, name, globs=None, verbose=None, report=True, + optionflags=0, extraglobs=None, raise_on_error=False, + parser=doctest.DocTestParser()): + + from doctest import DebugRunner, DocTestRunner, master + + # Assemble the globals. + if globs is None: + globs = {} + else: + globs = globs.copy() + if extraglobs is not None: + globs.update(extraglobs) + + if raise_on_error: + runner = DebugRunner(verbose=verbose, optionflags=optionflags) + else: + runner = DocTestRunner(verbose=verbose, optionflags=optionflags) + + test = parser.get_doctest(s, globs, name, name, 0) + runner.run(test) + + if report: + runner.summarize() + + if master is None: + master = runner + else: + master.merge(runner) + + return runner.failures, runner.tries + +def replace_file(s, newfile): + engine = r"'(sqlite|postgres|mysql):///.*'" + engine = re.compile(engine, re.MULTILINE) + s, n = re.subn(engine, "'sqlite:///" + newfile + "'", s) + if not n: + raise ValueError("Couldn't find suitable create_engine call to replace '%s' in it" % oldfile) + return s + +for filename in ('ormtutorial', 'sqlexpression'): + filename = 'content/%s.txt' % filename + s = open(filename).read() + #s = replace_file(s, ':memory:') + s = re.sub(r'{(?:stop|sql|opensql)}', '', s) + teststring(s, filename) + diff --git a/test/sql/selectable.py b/test/sql/selectable.py index d2f89fdee8..83203ad8e2 100755 --- a/test/sql/selectable.py +++ b/test/sql/selectable.py @@ -1,235 +1,235 @@ -"""tests that various From objects properly export their columns, as well as -useable primary keys and foreign keys. Full relational algebra depends on -every selectable unit behaving nicely with others..""" - -import testbase -from sqlalchemy import * -from testlib import * - -metadata = MetaData() -table = Table('table1', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', String(20)), - Column('col3', Integer), - Column('colx', Integer), - -) - -table2 = Table('table2', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', Integer, ForeignKey('table1.col1')), - Column('col3', String(20)), - Column('coly', Integer), -) - -class SelectableTest(AssertMixin): - def testdistance(self): - s = select([table.c.col1.label('c2'), table.c.col1, table.c.col1.label('c1')]) - - # didnt do this yet...col.label().make_proxy() has same "distance" as col.make_proxy() so far - #assert s.corresponding_column(table.c.col1) is s.c.col1 - assert s.corresponding_column(s.c.col1) is s.c.col1 - assert s.corresponding_column(s.c.c1) is s.c.c1 - - def testjoinagainstself(self): - jj = select([table.c.col1.label('bar_col1')]) - jjj = join(table, jj, table.c.col1==jj.c.bar_col1) - - # test column directly agaisnt itself - assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1 - - assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1 - - # test alias of the join, targets the column with the least - # "distance" between the requested column and the returned column - # (i.e. there is less indirection between j2.c.table1_col1 and table.c.col1, than - # there is from j2.c.bar_col1 to table.c.col1) - j2 = jjj.alias('foo') - assert j2.corresponding_column(table.c.col1) is j2.c.table1_col1 - - - def testjoinagainstjoin(self): - j = outerjoin(table, table2, table.c.col1==table2.c.col2) - jj = select([ table.c.col1.label('bar_col1')],from_obj=[j]).alias('foo') - jjj = join(table, jj, table.c.col1==jj.c.bar_col1) - assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1 - - j2 = jjj.alias('foo') - print j2.corresponding_column(jjj.c.table1_col1) - assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1 - - assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1 - - def testtablealias(self): - a = table.alias('a') - - j = join(a, table2) - - criterion = a.c.col1 == table2.c.col2 - print - print str(j) - self.assert_(criterion.compare(j.onclause)) - - def testunion(self): - # tests that we can correspond a column in a Select statement with a certain Table, against - # a column in a Union where one of its underlying Selects matches to that same Table - u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) - ) - s1 = table.select(use_labels=True) - s2 = table2.select(use_labels=True) - print ["%d %s" % (id(c),c.key) for c in u.c] - c = u.corresponding_column(s1.c.table1_col2) - print "%d %s" % (id(c), c.key) - print id(u.corresponding_column(s1.c.table1_col2).table) - print id(u.c.col2.table) - assert u.corresponding_column(s1.c.table1_col2) is u.c.col2 - assert u.corresponding_column(s2.c.table2_col2) is u.c.col2 - - def testaliasunion(self): - # same as testunion, except its an alias of the union - u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) - ).alias('analias') - s1 = table.select(use_labels=True) - s2 = table2.select(use_labels=True) - assert u.corresponding_column(s1.c.table1_col2) is u.c.col2 - assert u.corresponding_column(s2.c.table2_col2) is u.c.col2 - assert u.corresponding_column(s2.c.table2_coly) is u.c.coly - assert s2.corresponding_column(u.c.coly) is s2.c.table2_coly - - def testselectunion(self): - # like testaliasunion, but off a Select off the union. - u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) - ).alias('analias') - s = select([u]) - s1 = table.select(use_labels=True) - s2 = table2.select(use_labels=True) - assert s.corresponding_column(s1.c.table1_col2) is s.c.col2 - assert s.corresponding_column(s2.c.table2_col2) is s.c.col2 - - def testunionagainstjoin(self): - # same as testunion, except its an alias of the union - u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union( - select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) - ).alias('analias') - j1 = table.join(table2) - assert u.corresponding_column(j1.c.table1_colx) is u.c.colx - assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx - - def testjoin(self): - a = join(table, table2) - print str(a.select(use_labels=True)) - b = table2.alias('b') - j = join(a, b) - print str(j) - criterion = a.c.table1_col1 == b.c.col2 - self.assert_(criterion.compare(j.onclause)) - - def testselectalias(self): - a = table.select().alias('a') - print str(a.select()) - j = join(a, table2) - - criterion = a.c.col1 == table2.c.col2 - print criterion - print j.onclause - self.assert_(criterion.compare(j.onclause)) - - def testselectlabels(self): - a = table.select(use_labels=True) - print str(a.select()) - j = join(a, table2) - - criterion = a.c.table1_col1 == table2.c.col2 - print - print str(j) - self.assert_(criterion.compare(j.onclause)) - - def testcolumnlabels(self): - a = select([table.c.col1.label('acol1'), table.c.col2.label('acol2'), table.c.col3.label('acol3')]) - print str(a) - print [c for c in a.columns] - print str(a.select()) - j = join(a, table2) - criterion = a.c.acol1 == table2.c.col2 - print str(j) - self.assert_(criterion.compare(j.onclause)) - - def testselectaliaslabels(self): - a = table2.select(use_labels=True).alias('a') - print str(a.select()) - j = join(a, table) - - criterion = table.c.col1 == a.c.table2_col2 - print str(criterion) - print str(j.onclause) - self.assert_(criterion.compare(j.onclause)) - - -class PrimaryKeyTest(AssertMixin): - def test_join_pk_collapse_implicit(self): - """test that redundant columns in a join get 'collapsed' into a minimal primary key, - which is the root column along a chain of foreign key relationships.""" - - meta = MetaData() - a = Table('a', meta, Column('id', Integer, primary_key=True)) - b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True)) - c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True)) - d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True)) - - assert c.c.id.references(b.c.id) - assert not d.c.id.references(a.c.id) - - assert list(a.join(b).primary_key) == [a.c.id] - assert list(b.join(c).primary_key) == [b.c.id] - assert list(a.join(b).join(c).primary_key) == [a.c.id] - assert list(b.join(c).join(d).primary_key) == [b.c.id] - assert list(d.join(c).join(b).primary_key) == [b.c.id] - assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id] - - def test_join_pk_collapse_explicit(self): - """test that redundant columns in a join get 'collapsed' into a minimal primary key, - which is the root column along a chain of explicit join conditions.""" - - meta = MetaData() - a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer)) - b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer)) - c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True), Column('x', Integer)) - d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True), Column('x', Integer)) - - print list(a.join(b, a.c.x==b.c.id).primary_key) - assert list(a.join(b, a.c.x==b.c.id).primary_key) == [b.c.id] - assert list(b.join(c, b.c.x==c.c.id).primary_key) == [b.c.id] - assert list(a.join(b).join(c, c.c.id==b.c.x).primary_key) == [a.c.id] - assert list(b.join(c, c.c.x==b.c.id).join(d).primary_key) == [c.c.id] - assert list(b.join(c, c.c.id==b.c.x).join(d).primary_key) == [b.c.id] - assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [c.c.id] - assert list(a.join(b).join(c, c.c.id==b.c.x).join(d).primary_key) == [a.c.id] - - assert list(a.join(b, and_(a.c.id==b.c.id, a.c.x==b.c.id)).primary_key) == [a.c.id] - - def test_init_doesnt_blowitaway(self): - meta = MetaData() - a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer)) - b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer)) - - j = a.join(b) - assert list(j.primary_key) == [a.c.id] - - j.foreign_keys - assert list(j.primary_key) == [a.c.id] - - def test_non_column_clause(self): - meta = MetaData() - a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer)) - b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer, primary_key=True)) - - j = a.join(b, and_(a.c.id==b.c.id, b.c.x==5)) - assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :b_x", str(j) - assert list(j.primary_key) == [a.c.id, b.c.x] - -if __name__ == "__main__": - testbase.main() - +"""tests that various From objects properly export their columns, as well as +useable primary keys and foreign keys. Full relational algebra depends on +every selectable unit behaving nicely with others..""" + +import testbase +from sqlalchemy import * +from testlib import * + +metadata = MetaData() +table = Table('table1', metadata, + Column('col1', Integer, primary_key=True), + Column('col2', String(20)), + Column('col3', Integer), + Column('colx', Integer), + +) + +table2 = Table('table2', metadata, + Column('col1', Integer, primary_key=True), + Column('col2', Integer, ForeignKey('table1.col1')), + Column('col3', String(20)), + Column('coly', Integer), +) + +class SelectableTest(AssertMixin): + def testdistance(self): + s = select([table.c.col1.label('c2'), table.c.col1, table.c.col1.label('c1')]) + + # didnt do this yet...col.label().make_proxy() has same "distance" as col.make_proxy() so far + #assert s.corresponding_column(table.c.col1) is s.c.col1 + assert s.corresponding_column(s.c.col1) is s.c.col1 + assert s.corresponding_column(s.c.c1) is s.c.c1 + + def testjoinagainstself(self): + jj = select([table.c.col1.label('bar_col1')]) + jjj = join(table, jj, table.c.col1==jj.c.bar_col1) + + # test column directly agaisnt itself + assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1 + + assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1 + + # test alias of the join, targets the column with the least + # "distance" between the requested column and the returned column + # (i.e. there is less indirection between j2.c.table1_col1 and table.c.col1, than + # there is from j2.c.bar_col1 to table.c.col1) + j2 = jjj.alias('foo') + assert j2.corresponding_column(table.c.col1) is j2.c.table1_col1 + + + def testjoinagainstjoin(self): + j = outerjoin(table, table2, table.c.col1==table2.c.col2) + jj = select([ table.c.col1.label('bar_col1')],from_obj=[j]).alias('foo') + jjj = join(table, jj, table.c.col1==jj.c.bar_col1) + assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1 + + j2 = jjj.alias('foo') + print j2.corresponding_column(jjj.c.table1_col1) + assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1 + + assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1 + + def testtablealias(self): + a = table.alias('a') + + j = join(a, table2) + + criterion = a.c.col1 == table2.c.col2 + print + print str(j) + self.assert_(criterion.compare(j.onclause)) + + def testunion(self): + # tests that we can correspond a column in a Select statement with a certain Table, against + # a column in a Union where one of its underlying Selects matches to that same Table + u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union( + select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) + ) + s1 = table.select(use_labels=True) + s2 = table2.select(use_labels=True) + print ["%d %s" % (id(c),c.key) for c in u.c] + c = u.corresponding_column(s1.c.table1_col2) + print "%d %s" % (id(c), c.key) + print id(u.corresponding_column(s1.c.table1_col2).table) + print id(u.c.col2.table) + assert u.corresponding_column(s1.c.table1_col2) is u.c.col2 + assert u.corresponding_column(s2.c.table2_col2) is u.c.col2 + + def testaliasunion(self): + # same as testunion, except its an alias of the union + u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union( + select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) + ).alias('analias') + s1 = table.select(use_labels=True) + s2 = table2.select(use_labels=True) + assert u.corresponding_column(s1.c.table1_col2) is u.c.col2 + assert u.corresponding_column(s2.c.table2_col2) is u.c.col2 + assert u.corresponding_column(s2.c.table2_coly) is u.c.coly + assert s2.corresponding_column(u.c.coly) is s2.c.table2_coly + + def testselectunion(self): + # like testaliasunion, but off a Select off the union. + u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union( + select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) + ).alias('analias') + s = select([u]) + s1 = table.select(use_labels=True) + s2 = table2.select(use_labels=True) + assert s.corresponding_column(s1.c.table1_col2) is s.c.col2 + assert s.corresponding_column(s2.c.table2_col2) is s.c.col2 + + def testunionagainstjoin(self): + # same as testunion, except its an alias of the union + u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union( + select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly]) + ).alias('analias') + j1 = table.join(table2) + assert u.corresponding_column(j1.c.table1_colx) is u.c.colx + assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx + + def testjoin(self): + a = join(table, table2) + print str(a.select(use_labels=True)) + b = table2.alias('b') + j = join(a, b) + print str(j) + criterion = a.c.table1_col1 == b.c.col2 + self.assert_(criterion.compare(j.onclause)) + + def testselectalias(self): + a = table.select().alias('a') + print str(a.select()) + j = join(a, table2) + + criterion = a.c.col1 == table2.c.col2 + print criterion + print j.onclause + self.assert_(criterion.compare(j.onclause)) + + def testselectlabels(self): + a = table.select(use_labels=True) + print str(a.select()) + j = join(a, table2) + + criterion = a.c.table1_col1 == table2.c.col2 + print + print str(j) + self.assert_(criterion.compare(j.onclause)) + + def testcolumnlabels(self): + a = select([table.c.col1.label('acol1'), table.c.col2.label('acol2'), table.c.col3.label('acol3')]) + print str(a) + print [c for c in a.columns] + print str(a.select()) + j = join(a, table2) + criterion = a.c.acol1 == table2.c.col2 + print str(j) + self.assert_(criterion.compare(j.onclause)) + + def testselectaliaslabels(self): + a = table2.select(use_labels=True).alias('a') + print str(a.select()) + j = join(a, table) + + criterion = table.c.col1 == a.c.table2_col2 + print str(criterion) + print str(j.onclause) + self.assert_(criterion.compare(j.onclause)) + + +class PrimaryKeyTest(AssertMixin): + def test_join_pk_collapse_implicit(self): + """test that redundant columns in a join get 'collapsed' into a minimal primary key, + which is the root column along a chain of foreign key relationships.""" + + meta = MetaData() + a = Table('a', meta, Column('id', Integer, primary_key=True)) + b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True)) + c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True)) + d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True)) + + assert c.c.id.references(b.c.id) + assert not d.c.id.references(a.c.id) + + assert list(a.join(b).primary_key) == [a.c.id] + assert list(b.join(c).primary_key) == [b.c.id] + assert list(a.join(b).join(c).primary_key) == [a.c.id] + assert list(b.join(c).join(d).primary_key) == [b.c.id] + assert list(d.join(c).join(b).primary_key) == [b.c.id] + assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id] + + def test_join_pk_collapse_explicit(self): + """test that redundant columns in a join get 'collapsed' into a minimal primary key, + which is the root column along a chain of explicit join conditions.""" + + meta = MetaData() + a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer)) + b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer)) + c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True), Column('x', Integer)) + d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True), Column('x', Integer)) + + print list(a.join(b, a.c.x==b.c.id).primary_key) + assert list(a.join(b, a.c.x==b.c.id).primary_key) == [b.c.id] + assert list(b.join(c, b.c.x==c.c.id).primary_key) == [b.c.id] + assert list(a.join(b).join(c, c.c.id==b.c.x).primary_key) == [a.c.id] + assert list(b.join(c, c.c.x==b.c.id).join(d).primary_key) == [c.c.id] + assert list(b.join(c, c.c.id==b.c.x).join(d).primary_key) == [b.c.id] + assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [c.c.id] + assert list(a.join(b).join(c, c.c.id==b.c.x).join(d).primary_key) == [a.c.id] + + assert list(a.join(b, and_(a.c.id==b.c.id, a.c.x==b.c.id)).primary_key) == [a.c.id] + + def test_init_doesnt_blowitaway(self): + meta = MetaData() + a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer)) + b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer)) + + j = a.join(b) + assert list(j.primary_key) == [a.c.id] + + j.foreign_keys + assert list(j.primary_key) == [a.c.id] + + def test_non_column_clause(self): + meta = MetaData() + a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer)) + b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer, primary_key=True)) + + j = a.join(b, and_(a.c.id==b.c.id, b.c.x==5)) + assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :b_x", str(j) + assert list(j.primary_key) == [a.c.id, b.c.x] + +if __name__ == "__main__": + testbase.main() +