]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
set svn:eol-style native
authorJason Kirtland <jek@discorporate.us>
Sat, 13 Oct 2007 20:35:42 +0000 (20:35 +0000)
committerJason Kirtland <jek@discorporate.us>
Sat, 13 Oct 2007 20:35:42 +0000 (20:35 +0000)
doc/build/testdocs.py
test/sql/selectable.py

index 0f5f693300f3854b261b1cf8c862e63cecab31f7..998320fb0d1dc7b9c975d4a93517f5dee96b66da 100644 (file)
@@ -1,71 +1,71 @@
-import sys\r
-sys.path = ['../../lib', './lib/'] + sys.path\r
-\r
-import os\r
-import re\r
-import doctest\r
-import sqlalchemy.util as util\r
-import sqlalchemy.logging as salog\r
-import logging\r
-\r
-salog.default_enabled=True\r
-rootlogger = logging.getLogger('sqlalchemy')\r
-rootlogger.setLevel(logging.NOTSET)\r
-class MyStream(object):\r
-    def write(self, string):\r
-        sys.stdout.write(string)\r
-        sys.stdout.flush()\r
-    def flush(self):\r
-        pass\r
-handler = logging.StreamHandler(MyStream())\r
-handler.setFormatter(logging.Formatter('%(message)s'))\r
-rootlogger.addHandler(handler)\r
-\r
-\r
-def teststring(s, name, globs=None, verbose=None, report=True, \r
-               optionflags=0, extraglobs=None, raise_on_error=False, \r
-               parser=doctest.DocTestParser()):\r
-\r
-    from doctest import DebugRunner, DocTestRunner, master\r
-\r
-    # Assemble the globals.\r
-    if globs is None:\r
-        globs = {}\r
-    else:\r
-        globs = globs.copy()\r
-    if extraglobs is not None:\r
-        globs.update(extraglobs)\r
-\r
-    if raise_on_error:\r
-        runner = DebugRunner(verbose=verbose, optionflags=optionflags)\r
-    else:\r
-        runner = DocTestRunner(verbose=verbose, optionflags=optionflags)\r
-\r
-    test = parser.get_doctest(s, globs, name, name, 0)\r
-    runner.run(test)\r
-\r
-    if report:\r
-        runner.summarize()\r
-\r
-    if master is None:\r
-        master = runner\r
-    else:\r
-        master.merge(runner)\r
-\r
-    return runner.failures, runner.tries\r
-\r
-def replace_file(s, newfile):\r
-    engine = r"'(sqlite|postgres|mysql):///.*'"\r
-    engine = re.compile(engine, re.MULTILINE)\r
-    s, n = re.subn(engine, "'sqlite:///" + newfile + "'", s)\r
-    if not n:\r
-        raise ValueError("Couldn't find suitable create_engine call to replace '%s' in it" % oldfile)\r
-    return s\r
-\r
-for filename in ('ormtutorial', 'sqlexpression'):\r
-       filename = 'content/%s.txt' % filename\r
-       s = open(filename).read()\r
-       #s = replace_file(s, ':memory:')\r
-       s = re.sub(r'{(?:stop|sql|opensql)}', '', s)\r
-       teststring(s, filename)\r
-\r
+import sys
+sys.path = ['../../lib', './lib/'] + sys.path
+
+import os
+import re
+import doctest
+import sqlalchemy.util as util
+import sqlalchemy.logging as salog
+import logging
+
+salog.default_enabled=True
+rootlogger = logging.getLogger('sqlalchemy')
+rootlogger.setLevel(logging.NOTSET)
+class MyStream(object):
+    def write(self, string):
+        sys.stdout.write(string)
+        sys.stdout.flush()
+    def flush(self):
+        pass
+handler = logging.StreamHandler(MyStream())
+handler.setFormatter(logging.Formatter('%(message)s'))
+rootlogger.addHandler(handler)
+
+
+def teststring(s, name, globs=None, verbose=None, report=True, 
+               optionflags=0, extraglobs=None, raise_on_error=False, 
+               parser=doctest.DocTestParser()):
+
+    from doctest import DebugRunner, DocTestRunner, master
+
+    # Assemble the globals.
+    if globs is None:
+        globs = {}
+    else:
+        globs = globs.copy()
+    if extraglobs is not None:
+        globs.update(extraglobs)
+
+    if raise_on_error:
+        runner = DebugRunner(verbose=verbose, optionflags=optionflags)
+    else:
+        runner = DocTestRunner(verbose=verbose, optionflags=optionflags)
+
+    test = parser.get_doctest(s, globs, name, name, 0)
+    runner.run(test)
+
+    if report:
+        runner.summarize()
+
+    if master is None:
+        master = runner
+    else:
+        master.merge(runner)
+
+    return runner.failures, runner.tries
+
+def replace_file(s, newfile):
+    engine = r"'(sqlite|postgres|mysql):///.*'"
+    engine = re.compile(engine, re.MULTILINE)
+    s, n = re.subn(engine, "'sqlite:///" + newfile + "'", s)
+    if not n:
+        raise ValueError("Couldn't find suitable create_engine call to replace '%s' in it" % oldfile)
+    return s
+
+for filename in ('ormtutorial', 'sqlexpression'):
+       filename = 'content/%s.txt' % filename
+       s = open(filename).read()
+       #s = replace_file(s, ':memory:')
+       s = re.sub(r'{(?:stop|sql|opensql)}', '', s)
+       teststring(s, filename)
+
index d2f89fdee84b0ac1efe8cdb976a0efd0322f765e..83203ad8e2ab859a7dde0e08153841858851bfa7 100755 (executable)
-"""tests that various From objects properly export their columns, as well as\r
-useable primary keys and foreign keys.  Full relational algebra depends on\r
-every selectable unit behaving nicely with others.."""\r
\r
-import testbase\r
-from sqlalchemy import *\r
-from testlib import *\r
-\r
-metadata = MetaData()\r
-table = Table('table1', metadata, \r
-    Column('col1', Integer, primary_key=True),\r
-    Column('col2', String(20)),\r
-    Column('col3', Integer),\r
-    Column('colx', Integer),\r
-    \r
-)\r
-\r
-table2 = Table('table2', metadata,\r
-    Column('col1', Integer, primary_key=True),\r
-    Column('col2', Integer, ForeignKey('table1.col1')),\r
-    Column('col3', String(20)),\r
-    Column('coly', Integer),\r
-)\r
-\r
-class SelectableTest(AssertMixin):\r
-    def testdistance(self):\r
-        s = select([table.c.col1.label('c2'), table.c.col1, table.c.col1.label('c1')])\r
-\r
-        # didnt do this yet...col.label().make_proxy() has same "distance" as col.make_proxy() so far\r
-        #assert s.corresponding_column(table.c.col1) is s.c.col1\r
-        assert s.corresponding_column(s.c.col1) is s.c.col1\r
-        assert s.corresponding_column(s.c.c1) is s.c.c1\r
-        \r
-    def testjoinagainstself(self):\r
-        jj = select([table.c.col1.label('bar_col1')])\r
-        jjj = join(table, jj, table.c.col1==jj.c.bar_col1)\r
-        \r
-        # test column directly agaisnt itself\r
-        assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1\r
-\r
-        assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1\r
-        \r
-        # test alias of the join, targets the column with the least \r
-        # "distance" between the requested column and the returned column\r
-        # (i.e. there is less indirection between j2.c.table1_col1 and table.c.col1, than\r
-        # there is from j2.c.bar_col1 to table.c.col1)\r
-        j2 = jjj.alias('foo')\r
-        assert j2.corresponding_column(table.c.col1) is j2.c.table1_col1\r
-        \r
-\r
-    def testjoinagainstjoin(self):\r
-        j  = outerjoin(table, table2, table.c.col1==table2.c.col2)\r
-        jj = select([ table.c.col1.label('bar_col1')],from_obj=[j]).alias('foo')\r
-        jjj = join(table, jj, table.c.col1==jj.c.bar_col1)\r
-        assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1\r
-        \r
-        j2 = jjj.alias('foo')\r
-        print j2.corresponding_column(jjj.c.table1_col1)\r
-        assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1\r
-        \r
-        assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1\r
-        \r
-    def testtablealias(self):\r
-        a = table.alias('a')\r
-        \r
-        j = join(a, table2)\r
-        \r
-        criterion = a.c.col1 == table2.c.col2\r
-        print\r
-        print str(j)\r
-        self.assert_(criterion.compare(j.onclause))\r
-\r
-    def testunion(self):\r
-        # tests that we can correspond a column in a Select statement with a certain Table, against\r
-        # a column in a Union where one of its underlying Selects matches to that same Table\r
-        u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(\r
-                select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])\r
-            )\r
-        s1 = table.select(use_labels=True)\r
-        s2 = table2.select(use_labels=True)\r
-        print ["%d %s" % (id(c),c.key) for c in u.c]\r
-        c = u.corresponding_column(s1.c.table1_col2)\r
-        print "%d %s" % (id(c), c.key)\r
-        print id(u.corresponding_column(s1.c.table1_col2).table)\r
-        print id(u.c.col2.table)\r
-        assert u.corresponding_column(s1.c.table1_col2) is u.c.col2\r
-        assert u.corresponding_column(s2.c.table2_col2) is u.c.col2\r
-\r
-    def testaliasunion(self):\r
-        # same as testunion, except its an alias of the union\r
-        u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(\r
-                select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])\r
-            ).alias('analias')\r
-        s1 = table.select(use_labels=True)\r
-        s2 = table2.select(use_labels=True)\r
-        assert u.corresponding_column(s1.c.table1_col2) is u.c.col2\r
-        assert u.corresponding_column(s2.c.table2_col2) is u.c.col2\r
-        assert u.corresponding_column(s2.c.table2_coly) is u.c.coly\r
-        assert s2.corresponding_column(u.c.coly) is s2.c.table2_coly\r
-\r
-    def testselectunion(self):\r
-        # like testaliasunion, but off a Select off the union.\r
-        u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(\r
-                select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])\r
-            ).alias('analias')\r
-        s = select([u])\r
-        s1 = table.select(use_labels=True)\r
-        s2 = table2.select(use_labels=True)\r
-        assert s.corresponding_column(s1.c.table1_col2) is s.c.col2\r
-        assert s.corresponding_column(s2.c.table2_col2) is s.c.col2\r
-\r
-    def testunionagainstjoin(self):\r
-        # same as testunion, except its an alias of the union\r
-        u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(\r
-                select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])\r
-            ).alias('analias')\r
-        j1 = table.join(table2)\r
-        assert u.corresponding_column(j1.c.table1_colx) is u.c.colx\r
-        assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx\r
-        \r
-    def testjoin(self):\r
-        a = join(table, table2)\r
-        print str(a.select(use_labels=True))\r
-        b = table2.alias('b')\r
-        j = join(a, b)\r
-        print str(j)\r
-        criterion = a.c.table1_col1 == b.c.col2\r
-        self.assert_(criterion.compare(j.onclause))\r
-\r
-    def testselectalias(self):\r
-        a = table.select().alias('a')\r
-        print str(a.select())\r
-        j = join(a, table2)\r
-        \r
-        criterion = a.c.col1 == table2.c.col2\r
-        print criterion\r
-        print j.onclause\r
-        self.assert_(criterion.compare(j.onclause))\r
-\r
-    def testselectlabels(self):\r
-        a = table.select(use_labels=True)\r
-        print str(a.select())\r
-        j = join(a, table2)\r
-        \r
-        criterion = a.c.table1_col1 == table2.c.col2\r
-        print\r
-        print str(j)\r
-        self.assert_(criterion.compare(j.onclause))\r
-\r
-    def testcolumnlabels(self):\r
-        a = select([table.c.col1.label('acol1'), table.c.col2.label('acol2'), table.c.col3.label('acol3')])\r
-        print str(a)\r
-        print [c for c in a.columns]\r
-        print str(a.select())\r
-        j = join(a, table2)\r
-        criterion = a.c.acol1 == table2.c.col2\r
-        print str(j)\r
-        self.assert_(criterion.compare(j.onclause))\r
-        \r
-    def testselectaliaslabels(self):\r
-        a = table2.select(use_labels=True).alias('a')\r
-        print str(a.select())\r
-        j = join(a, table)\r
-        \r
-        criterion =  table.c.col1 == a.c.table2_col2\r
-        print str(criterion)\r
-        print str(j.onclause)\r
-        self.assert_(criterion.compare(j.onclause))\r
-        \r
-\r
-class PrimaryKeyTest(AssertMixin):\r
-    def test_join_pk_collapse_implicit(self):\r
-        """test that redundant columns in a join get 'collapsed' into a minimal primary key, \r
-        which is the root column along a chain of foreign key relationships."""\r
-        \r
-        meta = MetaData()\r
-        a = Table('a', meta, Column('id', Integer, primary_key=True))\r
-        b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True))\r
-        c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True))\r
-        d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True))\r
-\r
-        assert c.c.id.references(b.c.id)\r
-        assert not d.c.id.references(a.c.id)\r
-        \r
-        assert list(a.join(b).primary_key) == [a.c.id]\r
-        assert list(b.join(c).primary_key) == [b.c.id]\r
-        assert list(a.join(b).join(c).primary_key) == [a.c.id]\r
-        assert list(b.join(c).join(d).primary_key) == [b.c.id]\r
-        assert list(d.join(c).join(b).primary_key) == [b.c.id]\r
-        assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id]\r
-\r
-    def test_join_pk_collapse_explicit(self):\r
-        """test that redundant columns in a join get 'collapsed' into a minimal primary key, \r
-        which is the root column along a chain of explicit join conditions."""\r
-\r
-        meta = MetaData()\r
-        a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))\r
-        b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer))\r
-        c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True), Column('x', Integer))\r
-        d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True), Column('x', Integer))\r
-\r
-        print list(a.join(b, a.c.x==b.c.id).primary_key)\r
-        assert list(a.join(b, a.c.x==b.c.id).primary_key) == [b.c.id]\r
-        assert list(b.join(c, b.c.x==c.c.id).primary_key) == [b.c.id]\r
-        assert list(a.join(b).join(c, c.c.id==b.c.x).primary_key) == [a.c.id]\r
-        assert list(b.join(c, c.c.x==b.c.id).join(d).primary_key) == [c.c.id]\r
-        assert list(b.join(c, c.c.id==b.c.x).join(d).primary_key) == [b.c.id]\r
-        assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [c.c.id]\r
-        assert list(a.join(b).join(c, c.c.id==b.c.x).join(d).primary_key) == [a.c.id]\r
-        \r
-        assert list(a.join(b, and_(a.c.id==b.c.id, a.c.x==b.c.id)).primary_key) == [a.c.id]\r
-    \r
-    def test_init_doesnt_blowitaway(self):\r
-        meta = MetaData()\r
-        a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))\r
-        b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer))\r
-\r
-        j = a.join(b)\r
-        assert list(j.primary_key) == [a.c.id]\r
-        \r
-        j.foreign_keys\r
-        assert list(j.primary_key) == [a.c.id]\r
-\r
-    def test_non_column_clause(self):\r
-        meta = MetaData()\r
-        a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))\r
-        b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer, primary_key=True))\r
-\r
-        j = a.join(b, and_(a.c.id==b.c.id, b.c.x==5))\r
-        assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :b_x", str(j)\r
-        assert list(j.primary_key) == [a.c.id, b.c.x]\r
-\r
-if __name__ == "__main__":\r
-    testbase.main()\r
-\r
+"""tests that various From objects properly export their columns, as well as
+useable primary keys and foreign keys.  Full relational algebra depends on
+every selectable unit behaving nicely with others.."""
+import testbase
+from sqlalchemy import *
+from testlib import *
+
+metadata = MetaData()
+table = Table('table1', metadata, 
+    Column('col1', Integer, primary_key=True),
+    Column('col2', String(20)),
+    Column('col3', Integer),
+    Column('colx', Integer),
+    
+)
+
+table2 = Table('table2', metadata,
+    Column('col1', Integer, primary_key=True),
+    Column('col2', Integer, ForeignKey('table1.col1')),
+    Column('col3', String(20)),
+    Column('coly', Integer),
+)
+
+class SelectableTest(AssertMixin):
+    def testdistance(self):
+        s = select([table.c.col1.label('c2'), table.c.col1, table.c.col1.label('c1')])
+
+        # didnt do this yet...col.label().make_proxy() has same "distance" as col.make_proxy() so far
+        #assert s.corresponding_column(table.c.col1) is s.c.col1
+        assert s.corresponding_column(s.c.col1) is s.c.col1
+        assert s.corresponding_column(s.c.c1) is s.c.c1
+        
+    def testjoinagainstself(self):
+        jj = select([table.c.col1.label('bar_col1')])
+        jjj = join(table, jj, table.c.col1==jj.c.bar_col1)
+        
+        # test column directly agaisnt itself
+        assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
+
+        assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1
+        
+        # test alias of the join, targets the column with the least 
+        # "distance" between the requested column and the returned column
+        # (i.e. there is less indirection between j2.c.table1_col1 and table.c.col1, than
+        # there is from j2.c.bar_col1 to table.c.col1)
+        j2 = jjj.alias('foo')
+        assert j2.corresponding_column(table.c.col1) is j2.c.table1_col1
+        
+
+    def testjoinagainstjoin(self):
+        j  = outerjoin(table, table2, table.c.col1==table2.c.col2)
+        jj = select([ table.c.col1.label('bar_col1')],from_obj=[j]).alias('foo')
+        jjj = join(table, jj, table.c.col1==jj.c.bar_col1)
+        assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
+        
+        j2 = jjj.alias('foo')
+        print j2.corresponding_column(jjj.c.table1_col1)
+        assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1
+        
+        assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1
+        
+    def testtablealias(self):
+        a = table.alias('a')
+        
+        j = join(a, table2)
+        
+        criterion = a.c.col1 == table2.c.col2
+        print
+        print str(j)
+        self.assert_(criterion.compare(j.onclause))
+
+    def testunion(self):
+        # tests that we can correspond a column in a Select statement with a certain Table, against
+        # a column in a Union where one of its underlying Selects matches to that same Table
+        u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+                select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
+            )
+        s1 = table.select(use_labels=True)
+        s2 = table2.select(use_labels=True)
+        print ["%d %s" % (id(c),c.key) for c in u.c]
+        c = u.corresponding_column(s1.c.table1_col2)
+        print "%d %s" % (id(c), c.key)
+        print id(u.corresponding_column(s1.c.table1_col2).table)
+        print id(u.c.col2.table)
+        assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
+        assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
+
+    def testaliasunion(self):
+        # same as testunion, except its an alias of the union
+        u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+                select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
+            ).alias('analias')
+        s1 = table.select(use_labels=True)
+        s2 = table2.select(use_labels=True)
+        assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
+        assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
+        assert u.corresponding_column(s2.c.table2_coly) is u.c.coly
+        assert s2.corresponding_column(u.c.coly) is s2.c.table2_coly
+
+    def testselectunion(self):
+        # like testaliasunion, but off a Select off the union.
+        u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+                select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
+            ).alias('analias')
+        s = select([u])
+        s1 = table.select(use_labels=True)
+        s2 = table2.select(use_labels=True)
+        assert s.corresponding_column(s1.c.table1_col2) is s.c.col2
+        assert s.corresponding_column(s2.c.table2_col2) is s.c.col2
+
+    def testunionagainstjoin(self):
+        # same as testunion, except its an alias of the union
+        u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+                select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
+            ).alias('analias')
+        j1 = table.join(table2)
+        assert u.corresponding_column(j1.c.table1_colx) is u.c.colx
+        assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx
+        
+    def testjoin(self):
+        a = join(table, table2)
+        print str(a.select(use_labels=True))
+        b = table2.alias('b')
+        j = join(a, b)
+        print str(j)
+        criterion = a.c.table1_col1 == b.c.col2
+        self.assert_(criterion.compare(j.onclause))
+
+    def testselectalias(self):
+        a = table.select().alias('a')
+        print str(a.select())
+        j = join(a, table2)
+        
+        criterion = a.c.col1 == table2.c.col2
+        print criterion
+        print j.onclause
+        self.assert_(criterion.compare(j.onclause))
+
+    def testselectlabels(self):
+        a = table.select(use_labels=True)
+        print str(a.select())
+        j = join(a, table2)
+        
+        criterion = a.c.table1_col1 == table2.c.col2
+        print
+        print str(j)
+        self.assert_(criterion.compare(j.onclause))
+
+    def testcolumnlabels(self):
+        a = select([table.c.col1.label('acol1'), table.c.col2.label('acol2'), table.c.col3.label('acol3')])
+        print str(a)
+        print [c for c in a.columns]
+        print str(a.select())
+        j = join(a, table2)
+        criterion = a.c.acol1 == table2.c.col2
+        print str(j)
+        self.assert_(criterion.compare(j.onclause))
+        
+    def testselectaliaslabels(self):
+        a = table2.select(use_labels=True).alias('a')
+        print str(a.select())
+        j = join(a, table)
+        
+        criterion =  table.c.col1 == a.c.table2_col2
+        print str(criterion)
+        print str(j.onclause)
+        self.assert_(criterion.compare(j.onclause))
+        
+
+class PrimaryKeyTest(AssertMixin):
+    def test_join_pk_collapse_implicit(self):
+        """test that redundant columns in a join get 'collapsed' into a minimal primary key, 
+        which is the root column along a chain of foreign key relationships."""
+        
+        meta = MetaData()
+        a = Table('a', meta, Column('id', Integer, primary_key=True))
+        b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True))
+        c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True))
+        d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True))
+
+        assert c.c.id.references(b.c.id)
+        assert not d.c.id.references(a.c.id)
+        
+        assert list(a.join(b).primary_key) == [a.c.id]
+        assert list(b.join(c).primary_key) == [b.c.id]
+        assert list(a.join(b).join(c).primary_key) == [a.c.id]
+        assert list(b.join(c).join(d).primary_key) == [b.c.id]
+        assert list(d.join(c).join(b).primary_key) == [b.c.id]
+        assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id]
+
+    def test_join_pk_collapse_explicit(self):
+        """test that redundant columns in a join get 'collapsed' into a minimal primary key, 
+        which is the root column along a chain of explicit join conditions."""
+
+        meta = MetaData()
+        a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))
+        b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer))
+        c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True), Column('x', Integer))
+        d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True), Column('x', Integer))
+
+        print list(a.join(b, a.c.x==b.c.id).primary_key)
+        assert list(a.join(b, a.c.x==b.c.id).primary_key) == [b.c.id]
+        assert list(b.join(c, b.c.x==c.c.id).primary_key) == [b.c.id]
+        assert list(a.join(b).join(c, c.c.id==b.c.x).primary_key) == [a.c.id]
+        assert list(b.join(c, c.c.x==b.c.id).join(d).primary_key) == [c.c.id]
+        assert list(b.join(c, c.c.id==b.c.x).join(d).primary_key) == [b.c.id]
+        assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [c.c.id]
+        assert list(a.join(b).join(c, c.c.id==b.c.x).join(d).primary_key) == [a.c.id]
+        
+        assert list(a.join(b, and_(a.c.id==b.c.id, a.c.x==b.c.id)).primary_key) == [a.c.id]
+    
+    def test_init_doesnt_blowitaway(self):
+        meta = MetaData()
+        a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))
+        b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer))
+
+        j = a.join(b)
+        assert list(j.primary_key) == [a.c.id]
+        
+        j.foreign_keys
+        assert list(j.primary_key) == [a.c.id]
+
+    def test_non_column_clause(self):
+        meta = MetaData()
+        a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))
+        b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer, primary_key=True))
+
+        j = a.join(b, and_(a.c.id==b.c.id, b.c.x==5))
+        assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :b_x", str(j)
+        assert list(j.primary_key) == [a.c.id, b.c.x]
+
+if __name__ == "__main__":
+    testbase.main()
+