-import sys\r
-sys.path = ['../../lib', './lib/'] + sys.path\r
-\r
-import os\r
-import re\r
-import doctest\r
-import sqlalchemy.util as util\r
-import sqlalchemy.logging as salog\r
-import logging\r
-\r
-salog.default_enabled=True\r
-rootlogger = logging.getLogger('sqlalchemy')\r
-rootlogger.setLevel(logging.NOTSET)\r
-class MyStream(object):\r
- def write(self, string):\r
- sys.stdout.write(string)\r
- sys.stdout.flush()\r
- def flush(self):\r
- pass\r
-handler = logging.StreamHandler(MyStream())\r
-handler.setFormatter(logging.Formatter('%(message)s'))\r
-rootlogger.addHandler(handler)\r
-\r
-\r
-def teststring(s, name, globs=None, verbose=None, report=True, \r
- optionflags=0, extraglobs=None, raise_on_error=False, \r
- parser=doctest.DocTestParser()):\r
-\r
- from doctest import DebugRunner, DocTestRunner, master\r
-\r
- # Assemble the globals.\r
- if globs is None:\r
- globs = {}\r
- else:\r
- globs = globs.copy()\r
- if extraglobs is not None:\r
- globs.update(extraglobs)\r
-\r
- if raise_on_error:\r
- runner = DebugRunner(verbose=verbose, optionflags=optionflags)\r
- else:\r
- runner = DocTestRunner(verbose=verbose, optionflags=optionflags)\r
-\r
- test = parser.get_doctest(s, globs, name, name, 0)\r
- runner.run(test)\r
-\r
- if report:\r
- runner.summarize()\r
-\r
- if master is None:\r
- master = runner\r
- else:\r
- master.merge(runner)\r
-\r
- return runner.failures, runner.tries\r
-\r
-def replace_file(s, newfile):\r
- engine = r"'(sqlite|postgres|mysql):///.*'"\r
- engine = re.compile(engine, re.MULTILINE)\r
- s, n = re.subn(engine, "'sqlite:///" + newfile + "'", s)\r
- if not n:\r
- raise ValueError("Couldn't find suitable create_engine call to replace '%s' in it" % oldfile)\r
- return s\r
-\r
-for filename in ('ormtutorial', 'sqlexpression'):\r
- filename = 'content/%s.txt' % filename\r
- s = open(filename).read()\r
- #s = replace_file(s, ':memory:')\r
- s = re.sub(r'{(?:stop|sql|opensql)}', '', s)\r
- teststring(s, filename)\r
-\r
+import sys
+sys.path = ['../../lib', './lib/'] + sys.path
+
+import os
+import re
+import doctest
+import sqlalchemy.util as util
+import sqlalchemy.logging as salog
+import logging
+
+salog.default_enabled=True
+rootlogger = logging.getLogger('sqlalchemy')
+rootlogger.setLevel(logging.NOTSET)
+class MyStream(object):
+ def write(self, string):
+ sys.stdout.write(string)
+ sys.stdout.flush()
+ def flush(self):
+ pass
+handler = logging.StreamHandler(MyStream())
+handler.setFormatter(logging.Formatter('%(message)s'))
+rootlogger.addHandler(handler)
+
+
+def teststring(s, name, globs=None, verbose=None, report=True,
+ optionflags=0, extraglobs=None, raise_on_error=False,
+ parser=doctest.DocTestParser()):
+
+ from doctest import DebugRunner, DocTestRunner, master
+
+ # Assemble the globals.
+ if globs is None:
+ globs = {}
+ else:
+ globs = globs.copy()
+ if extraglobs is not None:
+ globs.update(extraglobs)
+
+ if raise_on_error:
+ runner = DebugRunner(verbose=verbose, optionflags=optionflags)
+ else:
+ runner = DocTestRunner(verbose=verbose, optionflags=optionflags)
+
+ test = parser.get_doctest(s, globs, name, name, 0)
+ runner.run(test)
+
+ if report:
+ runner.summarize()
+
+ if master is None:
+ master = runner
+ else:
+ master.merge(runner)
+
+ return runner.failures, runner.tries
+
+def replace_file(s, newfile):
+ engine = r"'(sqlite|postgres|mysql):///.*'"
+ engine = re.compile(engine, re.MULTILINE)
+ s, n = re.subn(engine, "'sqlite:///" + newfile + "'", s)
+ if not n:
+ raise ValueError("Couldn't find suitable create_engine call to replace '%s' in it" % oldfile)
+ return s
+
+for filename in ('ormtutorial', 'sqlexpression'):
+ filename = 'content/%s.txt' % filename
+ s = open(filename).read()
+ #s = replace_file(s, ':memory:')
+ s = re.sub(r'{(?:stop|sql|opensql)}', '', s)
+ teststring(s, filename)
+
-"""tests that various From objects properly export their columns, as well as\r
-useable primary keys and foreign keys. Full relational algebra depends on\r
-every selectable unit behaving nicely with others.."""\r
- \r
-import testbase\r
-from sqlalchemy import *\r
-from testlib import *\r
-\r
-metadata = MetaData()\r
-table = Table('table1', metadata, \r
- Column('col1', Integer, primary_key=True),\r
- Column('col2', String(20)),\r
- Column('col3', Integer),\r
- Column('colx', Integer),\r
- \r
-)\r
-\r
-table2 = Table('table2', metadata,\r
- Column('col1', Integer, primary_key=True),\r
- Column('col2', Integer, ForeignKey('table1.col1')),\r
- Column('col3', String(20)),\r
- Column('coly', Integer),\r
-)\r
-\r
-class SelectableTest(AssertMixin):\r
- def testdistance(self):\r
- s = select([table.c.col1.label('c2'), table.c.col1, table.c.col1.label('c1')])\r
-\r
- # didnt do this yet...col.label().make_proxy() has same "distance" as col.make_proxy() so far\r
- #assert s.corresponding_column(table.c.col1) is s.c.col1\r
- assert s.corresponding_column(s.c.col1) is s.c.col1\r
- assert s.corresponding_column(s.c.c1) is s.c.c1\r
- \r
- def testjoinagainstself(self):\r
- jj = select([table.c.col1.label('bar_col1')])\r
- jjj = join(table, jj, table.c.col1==jj.c.bar_col1)\r
- \r
- # test column directly agaisnt itself\r
- assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1\r
-\r
- assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1\r
- \r
- # test alias of the join, targets the column with the least \r
- # "distance" between the requested column and the returned column\r
- # (i.e. there is less indirection between j2.c.table1_col1 and table.c.col1, than\r
- # there is from j2.c.bar_col1 to table.c.col1)\r
- j2 = jjj.alias('foo')\r
- assert j2.corresponding_column(table.c.col1) is j2.c.table1_col1\r
- \r
-\r
- def testjoinagainstjoin(self):\r
- j = outerjoin(table, table2, table.c.col1==table2.c.col2)\r
- jj = select([ table.c.col1.label('bar_col1')],from_obj=[j]).alias('foo')\r
- jjj = join(table, jj, table.c.col1==jj.c.bar_col1)\r
- assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1\r
- \r
- j2 = jjj.alias('foo')\r
- print j2.corresponding_column(jjj.c.table1_col1)\r
- assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1\r
- \r
- assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1\r
- \r
- def testtablealias(self):\r
- a = table.alias('a')\r
- \r
- j = join(a, table2)\r
- \r
- criterion = a.c.col1 == table2.c.col2\r
- print\r
- print str(j)\r
- self.assert_(criterion.compare(j.onclause))\r
-\r
- def testunion(self):\r
- # tests that we can correspond a column in a Select statement with a certain Table, against\r
- # a column in a Union where one of its underlying Selects matches to that same Table\r
- u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(\r
- select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])\r
- )\r
- s1 = table.select(use_labels=True)\r
- s2 = table2.select(use_labels=True)\r
- print ["%d %s" % (id(c),c.key) for c in u.c]\r
- c = u.corresponding_column(s1.c.table1_col2)\r
- print "%d %s" % (id(c), c.key)\r
- print id(u.corresponding_column(s1.c.table1_col2).table)\r
- print id(u.c.col2.table)\r
- assert u.corresponding_column(s1.c.table1_col2) is u.c.col2\r
- assert u.corresponding_column(s2.c.table2_col2) is u.c.col2\r
-\r
- def testaliasunion(self):\r
- # same as testunion, except its an alias of the union\r
- u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(\r
- select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])\r
- ).alias('analias')\r
- s1 = table.select(use_labels=True)\r
- s2 = table2.select(use_labels=True)\r
- assert u.corresponding_column(s1.c.table1_col2) is u.c.col2\r
- assert u.corresponding_column(s2.c.table2_col2) is u.c.col2\r
- assert u.corresponding_column(s2.c.table2_coly) is u.c.coly\r
- assert s2.corresponding_column(u.c.coly) is s2.c.table2_coly\r
-\r
- def testselectunion(self):\r
- # like testaliasunion, but off a Select off the union.\r
- u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(\r
- select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])\r
- ).alias('analias')\r
- s = select([u])\r
- s1 = table.select(use_labels=True)\r
- s2 = table2.select(use_labels=True)\r
- assert s.corresponding_column(s1.c.table1_col2) is s.c.col2\r
- assert s.corresponding_column(s2.c.table2_col2) is s.c.col2\r
-\r
- def testunionagainstjoin(self):\r
- # same as testunion, except its an alias of the union\r
- u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(\r
- select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])\r
- ).alias('analias')\r
- j1 = table.join(table2)\r
- assert u.corresponding_column(j1.c.table1_colx) is u.c.colx\r
- assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx\r
- \r
- def testjoin(self):\r
- a = join(table, table2)\r
- print str(a.select(use_labels=True))\r
- b = table2.alias('b')\r
- j = join(a, b)\r
- print str(j)\r
- criterion = a.c.table1_col1 == b.c.col2\r
- self.assert_(criterion.compare(j.onclause))\r
-\r
- def testselectalias(self):\r
- a = table.select().alias('a')\r
- print str(a.select())\r
- j = join(a, table2)\r
- \r
- criterion = a.c.col1 == table2.c.col2\r
- print criterion\r
- print j.onclause\r
- self.assert_(criterion.compare(j.onclause))\r
-\r
- def testselectlabels(self):\r
- a = table.select(use_labels=True)\r
- print str(a.select())\r
- j = join(a, table2)\r
- \r
- criterion = a.c.table1_col1 == table2.c.col2\r
- print\r
- print str(j)\r
- self.assert_(criterion.compare(j.onclause))\r
-\r
- def testcolumnlabels(self):\r
- a = select([table.c.col1.label('acol1'), table.c.col2.label('acol2'), table.c.col3.label('acol3')])\r
- print str(a)\r
- print [c for c in a.columns]\r
- print str(a.select())\r
- j = join(a, table2)\r
- criterion = a.c.acol1 == table2.c.col2\r
- print str(j)\r
- self.assert_(criterion.compare(j.onclause))\r
- \r
- def testselectaliaslabels(self):\r
- a = table2.select(use_labels=True).alias('a')\r
- print str(a.select())\r
- j = join(a, table)\r
- \r
- criterion = table.c.col1 == a.c.table2_col2\r
- print str(criterion)\r
- print str(j.onclause)\r
- self.assert_(criterion.compare(j.onclause))\r
- \r
-\r
-class PrimaryKeyTest(AssertMixin):\r
- def test_join_pk_collapse_implicit(self):\r
- """test that redundant columns in a join get 'collapsed' into a minimal primary key, \r
- which is the root column along a chain of foreign key relationships."""\r
- \r
- meta = MetaData()\r
- a = Table('a', meta, Column('id', Integer, primary_key=True))\r
- b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True))\r
- c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True))\r
- d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True))\r
-\r
- assert c.c.id.references(b.c.id)\r
- assert not d.c.id.references(a.c.id)\r
- \r
- assert list(a.join(b).primary_key) == [a.c.id]\r
- assert list(b.join(c).primary_key) == [b.c.id]\r
- assert list(a.join(b).join(c).primary_key) == [a.c.id]\r
- assert list(b.join(c).join(d).primary_key) == [b.c.id]\r
- assert list(d.join(c).join(b).primary_key) == [b.c.id]\r
- assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id]\r
-\r
- def test_join_pk_collapse_explicit(self):\r
- """test that redundant columns in a join get 'collapsed' into a minimal primary key, \r
- which is the root column along a chain of explicit join conditions."""\r
-\r
- meta = MetaData()\r
- a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))\r
- b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer))\r
- c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True), Column('x', Integer))\r
- d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True), Column('x', Integer))\r
-\r
- print list(a.join(b, a.c.x==b.c.id).primary_key)\r
- assert list(a.join(b, a.c.x==b.c.id).primary_key) == [b.c.id]\r
- assert list(b.join(c, b.c.x==c.c.id).primary_key) == [b.c.id]\r
- assert list(a.join(b).join(c, c.c.id==b.c.x).primary_key) == [a.c.id]\r
- assert list(b.join(c, c.c.x==b.c.id).join(d).primary_key) == [c.c.id]\r
- assert list(b.join(c, c.c.id==b.c.x).join(d).primary_key) == [b.c.id]\r
- assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [c.c.id]\r
- assert list(a.join(b).join(c, c.c.id==b.c.x).join(d).primary_key) == [a.c.id]\r
- \r
- assert list(a.join(b, and_(a.c.id==b.c.id, a.c.x==b.c.id)).primary_key) == [a.c.id]\r
- \r
- def test_init_doesnt_blowitaway(self):\r
- meta = MetaData()\r
- a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))\r
- b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer))\r
-\r
- j = a.join(b)\r
- assert list(j.primary_key) == [a.c.id]\r
- \r
- j.foreign_keys\r
- assert list(j.primary_key) == [a.c.id]\r
-\r
- def test_non_column_clause(self):\r
- meta = MetaData()\r
- a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))\r
- b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer, primary_key=True))\r
-\r
- j = a.join(b, and_(a.c.id==b.c.id, b.c.x==5))\r
- assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :b_x", str(j)\r
- assert list(j.primary_key) == [a.c.id, b.c.x]\r
-\r
-if __name__ == "__main__":\r
- testbase.main()\r
-\r
+"""tests that various From objects properly export their columns, as well as
+useable primary keys and foreign keys. Full relational algebra depends on
+every selectable unit behaving nicely with others.."""
+
+import testbase
+from sqlalchemy import *
+from testlib import *
+
+metadata = MetaData()
+table = Table('table1', metadata,
+ Column('col1', Integer, primary_key=True),
+ Column('col2', String(20)),
+ Column('col3', Integer),
+ Column('colx', Integer),
+
+)
+
+table2 = Table('table2', metadata,
+ Column('col1', Integer, primary_key=True),
+ Column('col2', Integer, ForeignKey('table1.col1')),
+ Column('col3', String(20)),
+ Column('coly', Integer),
+)
+
+class SelectableTest(AssertMixin):
+ def testdistance(self):
+ s = select([table.c.col1.label('c2'), table.c.col1, table.c.col1.label('c1')])
+
+ # didnt do this yet...col.label().make_proxy() has same "distance" as col.make_proxy() so far
+ #assert s.corresponding_column(table.c.col1) is s.c.col1
+ assert s.corresponding_column(s.c.col1) is s.c.col1
+ assert s.corresponding_column(s.c.c1) is s.c.c1
+
+ def testjoinagainstself(self):
+ jj = select([table.c.col1.label('bar_col1')])
+ jjj = join(table, jj, table.c.col1==jj.c.bar_col1)
+
+ # test column directly agaisnt itself
+ assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
+
+ assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1
+
+ # test alias of the join, targets the column with the least
+ # "distance" between the requested column and the returned column
+ # (i.e. there is less indirection between j2.c.table1_col1 and table.c.col1, than
+ # there is from j2.c.bar_col1 to table.c.col1)
+ j2 = jjj.alias('foo')
+ assert j2.corresponding_column(table.c.col1) is j2.c.table1_col1
+
+
+ def testjoinagainstjoin(self):
+ j = outerjoin(table, table2, table.c.col1==table2.c.col2)
+ jj = select([ table.c.col1.label('bar_col1')],from_obj=[j]).alias('foo')
+ jjj = join(table, jj, table.c.col1==jj.c.bar_col1)
+ assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
+
+ j2 = jjj.alias('foo')
+ print j2.corresponding_column(jjj.c.table1_col1)
+ assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1
+
+ assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1
+
+ def testtablealias(self):
+ a = table.alias('a')
+
+ j = join(a, table2)
+
+ criterion = a.c.col1 == table2.c.col2
+ print
+ print str(j)
+ self.assert_(criterion.compare(j.onclause))
+
+ def testunion(self):
+ # tests that we can correspond a column in a Select statement with a certain Table, against
+ # a column in a Union where one of its underlying Selects matches to that same Table
+ u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
+ )
+ s1 = table.select(use_labels=True)
+ s2 = table2.select(use_labels=True)
+ print ["%d %s" % (id(c),c.key) for c in u.c]
+ c = u.corresponding_column(s1.c.table1_col2)
+ print "%d %s" % (id(c), c.key)
+ print id(u.corresponding_column(s1.c.table1_col2).table)
+ print id(u.c.col2.table)
+ assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
+ assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
+
+ def testaliasunion(self):
+ # same as testunion, except its an alias of the union
+ u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
+ ).alias('analias')
+ s1 = table.select(use_labels=True)
+ s2 = table2.select(use_labels=True)
+ assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
+ assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
+ assert u.corresponding_column(s2.c.table2_coly) is u.c.coly
+ assert s2.corresponding_column(u.c.coly) is s2.c.table2_coly
+
+ def testselectunion(self):
+ # like testaliasunion, but off a Select off the union.
+ u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
+ ).alias('analias')
+ s = select([u])
+ s1 = table.select(use_labels=True)
+ s2 = table2.select(use_labels=True)
+ assert s.corresponding_column(s1.c.table1_col2) is s.c.col2
+ assert s.corresponding_column(s2.c.table2_col2) is s.c.col2
+
+ def testunionagainstjoin(self):
+ # same as testunion, except its an alias of the union
+ u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
+ ).alias('analias')
+ j1 = table.join(table2)
+ assert u.corresponding_column(j1.c.table1_colx) is u.c.colx
+ assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx
+
+ def testjoin(self):
+ a = join(table, table2)
+ print str(a.select(use_labels=True))
+ b = table2.alias('b')
+ j = join(a, b)
+ print str(j)
+ criterion = a.c.table1_col1 == b.c.col2
+ self.assert_(criterion.compare(j.onclause))
+
+ def testselectalias(self):
+ a = table.select().alias('a')
+ print str(a.select())
+ j = join(a, table2)
+
+ criterion = a.c.col1 == table2.c.col2
+ print criterion
+ print j.onclause
+ self.assert_(criterion.compare(j.onclause))
+
+ def testselectlabels(self):
+ a = table.select(use_labels=True)
+ print str(a.select())
+ j = join(a, table2)
+
+ criterion = a.c.table1_col1 == table2.c.col2
+ print
+ print str(j)
+ self.assert_(criterion.compare(j.onclause))
+
+ def testcolumnlabels(self):
+ a = select([table.c.col1.label('acol1'), table.c.col2.label('acol2'), table.c.col3.label('acol3')])
+ print str(a)
+ print [c for c in a.columns]
+ print str(a.select())
+ j = join(a, table2)
+ criterion = a.c.acol1 == table2.c.col2
+ print str(j)
+ self.assert_(criterion.compare(j.onclause))
+
+ def testselectaliaslabels(self):
+ a = table2.select(use_labels=True).alias('a')
+ print str(a.select())
+ j = join(a, table)
+
+ criterion = table.c.col1 == a.c.table2_col2
+ print str(criterion)
+ print str(j.onclause)
+ self.assert_(criterion.compare(j.onclause))
+
+
+class PrimaryKeyTest(AssertMixin):
+ def test_join_pk_collapse_implicit(self):
+ """test that redundant columns in a join get 'collapsed' into a minimal primary key,
+ which is the root column along a chain of foreign key relationships."""
+
+ meta = MetaData()
+ a = Table('a', meta, Column('id', Integer, primary_key=True))
+ b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True))
+ c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True))
+ d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True))
+
+ assert c.c.id.references(b.c.id)
+ assert not d.c.id.references(a.c.id)
+
+ assert list(a.join(b).primary_key) == [a.c.id]
+ assert list(b.join(c).primary_key) == [b.c.id]
+ assert list(a.join(b).join(c).primary_key) == [a.c.id]
+ assert list(b.join(c).join(d).primary_key) == [b.c.id]
+ assert list(d.join(c).join(b).primary_key) == [b.c.id]
+ assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id]
+
+ def test_join_pk_collapse_explicit(self):
+ """test that redundant columns in a join get 'collapsed' into a minimal primary key,
+ which is the root column along a chain of explicit join conditions."""
+
+ meta = MetaData()
+ a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))
+ b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer))
+ c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True), Column('x', Integer))
+ d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True), Column('x', Integer))
+
+ print list(a.join(b, a.c.x==b.c.id).primary_key)
+ assert list(a.join(b, a.c.x==b.c.id).primary_key) == [b.c.id]
+ assert list(b.join(c, b.c.x==c.c.id).primary_key) == [b.c.id]
+ assert list(a.join(b).join(c, c.c.id==b.c.x).primary_key) == [a.c.id]
+ assert list(b.join(c, c.c.x==b.c.id).join(d).primary_key) == [c.c.id]
+ assert list(b.join(c, c.c.id==b.c.x).join(d).primary_key) == [b.c.id]
+ assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [c.c.id]
+ assert list(a.join(b).join(c, c.c.id==b.c.x).join(d).primary_key) == [a.c.id]
+
+ assert list(a.join(b, and_(a.c.id==b.c.id, a.c.x==b.c.id)).primary_key) == [a.c.id]
+
+ def test_init_doesnt_blowitaway(self):
+ meta = MetaData()
+ a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))
+ b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer))
+
+ j = a.join(b)
+ assert list(j.primary_key) == [a.c.id]
+
+ j.foreign_keys
+ assert list(j.primary_key) == [a.c.id]
+
+ def test_non_column_clause(self):
+ meta = MetaData()
+ a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))
+ b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer, primary_key=True))
+
+ j = a.join(b, and_(a.c.id==b.c.id, b.c.x==5))
+ assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :b_x", str(j)
+ assert list(j.primary_key) == [a.c.id, b.c.x]
+
+if __name__ == "__main__":
+ testbase.main()
+