From a6b62cc3fed5f06d3428b1f6ee13756175ded61b Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sun, 11 Jul 2010 13:15:51 -0400 Subject: [PATCH] Python-tidy test/engine and test/aaa_profiling, 80% auto + 20% manual intervention --- .../custom_attributes/listen_for_events.py | 3 +- test/aaa_profiling/test_memusage.py | 83 ++- test/aaa_profiling/test_orm.py | 79 ++- test/aaa_profiling/test_resultset.py | 41 +- test/aaa_profiling/test_zoomark.py | 424 +++++++------ test/aaa_profiling/test_zoomark_orm.py | 371 ++++++----- test/engine/test_bind.py | 40 +- test/engine/test_ddlevents.py | 79 +-- test/engine/test_execute.py | 317 ++++++---- test/engine/test_metadata.py | 93 ++- test/engine/test_parseconnect.py | 182 ++++-- test/engine/test_pool.py | 158 ++--- test/engine/test_reconnect.py | 86 ++- test/engine/test_reflection.py | 500 ++++++++------- test/engine/test_transaction.py | 585 +++++++++--------- 15 files changed, 1658 insertions(+), 1383 deletions(-) diff --git a/examples/custom_attributes/listen_for_events.py b/examples/custom_attributes/listen_for_events.py index 0027657393..6d467fbbc1 100644 --- a/examples/custom_attributes/listen_for_events.py +++ b/examples/custom_attributes/listen_for_events.py @@ -4,7 +4,8 @@ across the board. """ -from sqlalchemy.orm.interfaces import AttributeExtension, InstrumentationManager +from sqlalchemy.orm.interfaces import AttributeExtension, \ + InstrumentationManager class InstallListeners(InstrumentationManager): def post_configure_attribute(self, class_, key, inst): diff --git a/test/aaa_profiling/test_memusage.py b/test/aaa_profiling/test_memusage.py index 3a0535d345..5fa40a9978 100644 --- a/test/aaa_profiling/test_memusage.py +++ b/test/aaa_profiling/test_memusage.py @@ -1,13 +1,13 @@ from sqlalchemy.test.testing import eq_ -from sqlalchemy.orm import mapper, relationship, create_session,\ - clear_mappers, sessionmaker, class_mapper +from sqlalchemy.orm import mapper, relationship, create_session, \ + clear_mappers, sessionmaker, class_mapper from sqlalchemy.orm.mapper import _mapper_registry from sqlalchemy.orm.session import _sessions from sqlalchemy.util import jython import operator from sqlalchemy.test import testing, engines from sqlalchemy import MetaData, Integer, String, ForeignKey, \ - PickleType, create_engine, Unicode + PickleType, create_engine, Unicode from sqlalchemy.test.schema import Table, Column import sqlalchemy as sa from sqlalchemy.sql import column @@ -253,28 +253,29 @@ class MemUsageTest(EnsureZeroed): finally: metadata.drop_all() - @testing.fails_if(lambda: - testing.db.dialect.name == 'sqlite' and - testing.db.dialect.dbapi.version_info >= (2,5), - "Newer pysqlites generate warnings here too and have similar issues." - ) + @testing.fails_if(lambda : testing.db.dialect.name == 'sqlite' \ + and testing.db.dialect.dbapi.version_info >= (2, + 5), + 'Newer pysqlites generate warnings here too and ' + 'have similar issues.') def test_unicode_warnings(self): metadata = MetaData(testing.db) - table1 = Table("mytable", metadata, - Column('col1', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('col2', Unicode(30))) - + table1 = Table('mytable', metadata, Column('col1', Integer, + primary_key=True, + test_needs_autoincrement=True), Column('col2', + Unicode(30))) metadata.create_all() - i = [1] + @testing.emits_warning() @profile_memory def go(): - # execute with a non-unicode object. - # a warning is emitted, this warning shouldn't - # clog up memory. - testing.db.execute(table1.select().where(table1.c.col2=='foo%d' % i[0])) + + # execute with a non-unicode object. a warning is emitted, + # this warning shouldn't clog up memory. + + testing.db.execute(table1.select().where(table1.c.col2 + == 'foo%d' % i[0])) i[0] += 1 try: go() @@ -466,41 +467,35 @@ class MemUsageTest(EnsureZeroed): metadata.drop_all() assert_no_mappers() - # fails on newer versions of pysqlite due to unusual memory - # behvior in pysqlite itself. - # background at: http://thread.gmane.org/gmane.comp.python.db.pysqlite.user/2290 - @testing.fails_if(lambda: - testing.db.dialect.name == 'sqlite' and - testing.db.dialect.dbapi.version > '2.5') + # fails on newer versions of pysqlite due to unusual memory behvior + # in pysqlite itself. background at: + # http://thread.gmane.org/gmane.comp.python.db.pysqlite.user/2290 + + @testing.fails_if(lambda : testing.db.dialect.name == 'sqlite' \ + and testing.db.dialect.dbapi.version > '2.5') def test_join_cache(self): metadata = MetaData(testing.db) + table1 = Table('table1', metadata, Column('id', Integer, + primary_key=True, + test_needs_autoincrement=True), Column('data', + String(30))) + table2 = Table('table2', metadata, Column('id', Integer, + primary_key=True, + test_needs_autoincrement=True), Column('data', + String(30)), Column('t1id', Integer, + ForeignKey('table1.id'))) - table1 = Table("table1", metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('data', String(30)) - ) - - table2 = Table("table2", metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('data', String(30)), - Column('t1id', Integer, ForeignKey('table1.id')) - ) - class Foo(object): pass - + class Bar(object): pass - - mapper(Foo, table1, properties={ - 'bars':relationship(mapper(Bar, table2)) - }) - metadata.create_all() + mapper(Foo, table1, properties={'bars' + : relationship(mapper(Bar, table2))}) + metadata.create_all() session = sessionmaker() - + @profile_memory def go(): s = table2.select() diff --git a/test/aaa_profiling/test_orm.py b/test/aaa_profiling/test_orm.py index 0395cb92d6..f2b876837c 100644 --- a/test/aaa_profiling/test_orm.py +++ b/test/aaa_profiling/test_orm.py @@ -1,75 +1,70 @@ -from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message +from sqlalchemy.test.testing import eq_, assert_raises, \ + assert_raises_message from sqlalchemy import exc as sa_exc, util, Integer, String, ForeignKey -from sqlalchemy.orm import exc as orm_exc, mapper, relationship, sessionmaker - +from sqlalchemy.orm import exc as orm_exc, mapper, relationship, \ + sessionmaker from sqlalchemy.test import testing, profiling from test.orm import _base from sqlalchemy.test.schema import Table, Column class MergeTest(_base.MappedTest): + @classmethod def define_tables(cls, metadata): - parent = Table('parent', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('data', String(20)) - ) - - child = Table('child', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('data', String(20)), - Column('parent_id', Integer, ForeignKey('parent.id'), nullable=False) - ) - + parent = Table('parent', metadata, Column('id', Integer, + primary_key=True, + test_needs_autoincrement=True), Column('data', + String(20))) + child = Table('child', metadata, Column('id', Integer, + primary_key=True, test_needs_autoincrement=True), + Column('data', String(20)), Column('parent_id', + Integer, ForeignKey('parent.id'), nullable=False)) @classmethod def setup_classes(cls): class Parent(_base.BasicEntity): pass + class Child(_base.BasicEntity): pass @classmethod @testing.resolve_artifact_names def setup_mappers(cls): - mapper(Parent, parent, properties={ - 'children':relationship(Child, backref='parent') - }) + mapper(Parent, parent, properties={'children' + : relationship(Child, backref='parent')}) mapper(Child, child) @classmethod @testing.resolve_artifact_names def insert_data(cls): - parent.insert().execute( - {'id':1, 'data':'p1'}, - ) - child.insert().execute( - {'id':1, 'data':'p1c1', 'parent_id':1}, - ) - + parent.insert().execute({'id': 1, 'data': 'p1'}) + child.insert().execute({'id': 1, 'data': 'p1c1', 'parent_id' + : 1}) + @testing.resolve_artifact_names def test_merge_no_load(self): sess = sessionmaker()() sess2 = sessionmaker()() - p1 = sess.query(Parent).get(1) p1.children - - # down from 185 on this - # this is a small slice of a usually bigger - # operation so using a small variance - @profiling.function_call_count(95, variance=0.001, versions={'2.4':67, '3':96}) + + # down from 185 on this this is a small slice of a usually + # bigger operation so using a small variance + + @profiling.function_call_count(95, variance=0.001, + versions={'2.4': 67, '3': 96}) def go(): return sess2.merge(p1, load=False) - p2 = go() - # third call, merge object already present. - # almost no calls. - @profiling.function_call_count(12, variance=0.001, versions={'2.4':8, '3':13}) + # third call, merge object already present. almost no calls. + + @profiling.function_call_count(12, variance=0.001, + versions={'2.4': 8, '3': 13}) def go(): return sess2.merge(p2, load=False) - p3 = go() @testing.only_on('sqlite', 'Call counts tailored to pysqlite') @@ -77,19 +72,19 @@ class MergeTest(_base.MappedTest): def test_merge_load(self): sess = sessionmaker()() sess2 = sessionmaker()() - p1 = sess.query(Parent).get(1) p1.children - # preloading of collection took this down from 1728 - # to 1192 using sqlite3 - # the C extension took it back up to approx. 1257 (py2.6) - @profiling.function_call_count(1257, versions={'2.4':807}) + # preloading of collection took this down from 1728 to 1192 + # using sqlite3 the C extension took it back up to approx. 1257 + # (py2.6) + + @profiling.function_call_count(1257, versions={'2.4': 807}) def go(): p2 = sess2.merge(p1) go() - + # one more time, count the SQL + sess2 = sessionmaker()() self.assert_sql_count(testing.db, go, 2) - diff --git a/test/aaa_profiling/test_resultset.py b/test/aaa_profiling/test_resultset.py index 459a8e4c4d..d71b8dab84 100644 --- a/test/aaa_profiling/test_resultset.py +++ b/test/aaa_profiling/test_resultset.py @@ -1,41 +1,42 @@ from sqlalchemy import * from sqlalchemy.test import * - NUM_FIELDS = 10 NUM_RECORDS = 1000 + class ResultSetTest(TestBase, AssertsExecutionResults): + __only_on__ = 'sqlite' - + @classmethod def setup_class(cls): global t, t2, metadata - metadata = MetaData(testing.db) - t = Table('table', metadata, *[Column("field%d" % fnum, String) for fnum in range(NUM_FIELDS)]) - t2 = Table('table2', metadata, *[Column("field%d" % fnum, Unicode) for fnum in range(NUM_FIELDS)]) + t = Table('table', metadata, *[Column('field%d' % fnum, String) + for fnum in range(NUM_FIELDS)]) + t2 = Table('table2', metadata, *[Column('field%d' % fnum, + Unicode) for fnum in range(NUM_FIELDS)]) def setup(self): metadata.create_all() - t.insert().execute( - [dict(("field%d" % fnum, u"value%d" % fnum) - for fnum in range(NUM_FIELDS)) for r_num in range(NUM_RECORDS)] - ) - t2.insert().execute( - [dict(("field%d" % fnum, u"value%d" % fnum) - for fnum in range(NUM_FIELDS)) for r_num in range(NUM_RECORDS)] - ) - + t.insert().execute([dict(('field%d' % fnum, u'value%d' % fnum) + for fnum in range(NUM_FIELDS)) for r_num in + range(NUM_RECORDS)]) + t2.insert().execute([dict(('field%d' % fnum, u'value%d' % fnum) + for fnum in range(NUM_FIELDS)) for r_num in + range(NUM_RECORDS)]) + def teardown(self): metadata.drop_all() - - @profiling.function_call_count(14416, versions={'2.4':13214, '2.6+cextension':409}) + + @profiling.function_call_count(14416, versions={'2.4': 13214, + '2.6+cextension': 409}) def test_string(self): [tuple(row) for row in t.select().execute().fetchall()] - # sqlite3 returns native unicode. so shouldn't be an - # increase here. - @profiling.function_call_count(14396, versions={'2.4':13214, '2.6+cextension':409}) + # sqlite3 returns native unicode. so shouldn't be an increase here. + + @profiling.function_call_count(14396, versions={'2.4': 13214, + '2.6+cextension': 409}) def test_unicode(self): [tuple(row) for row in t2.select().execute().fetchall()] - diff --git a/test/aaa_profiling/test_zoomark.py b/test/aaa_profiling/test_zoomark.py index 20dd2597e2..dc990e9839 100644 --- a/test/aaa_profiling/test_zoomark.py +++ b/test/aaa_profiling/test_zoomark.py @@ -1,143 +1,137 @@ """Benchmark for SQLAlchemy. -An adaptation of Robert Brewers' ZooMark speed tests. -""" +An adaptation of Robert Brewers' ZooMark speed tests. """ + import datetime import sys import time from sqlalchemy import * from sqlalchemy.test import * - ITERATIONS = 1 - dbapi_session = engines.ReplayableSession() metadata = None - -class ZooMarkTest(TestBase): - """Runs the ZooMark and squawks if method counts vary from the norm. - Each test has an associated `call_range`, the total number of accepted - function calls made during the test. The count can vary between Python - 2.4 and 2.5. +class ZooMarkTest(TestBase): + """Runs the ZooMark and squawks if method counts vary from the norm. + + Each test has an associated `call_range`, the total number of + accepted function calls made during the test. The count can vary + between Python 2.4 and 2.5. + Unlike a unit test, this is a ordered collection of steps. Running components individually will fail. - + """ __only_on__ = 'postgresql+psycopg2' - __skip_if__ = ((lambda: sys.version_info < (2, 4)), ) + __skip_if__ = lambda : sys.version_info < (2, 4), def test_baseline_0_setup(self): global metadata - creator = testing.db.pool._creator - - recorder = lambda: dbapi_session.recorder(creator()) - engine = engines.testing_engine(options={'creator':recorder}) + recorder = lambda : dbapi_session.recorder(creator()) + engine = engines.testing_engine(options={'creator': recorder}) metadata = MetaData(engine) engine.connect() - + def test_baseline_1_create_tables(self): - Zoo = Table('Zoo', metadata, - Column('ID', Integer, Sequence('zoo_id_seq'), - primary_key=True, index=True), - Column('Name', Unicode(255)), - Column('Founded', Date), - Column('Opens', Time), - Column('LastEscape', DateTime), - Column('Admission', Float), - ) - - Animal = Table('Animal', metadata, - Column('ID', Integer, Sequence('animal_id_seq'), - primary_key=True), - Column('ZooID', Integer, ForeignKey('Zoo.ID'), - index=True), - Column('Name', Unicode(100)), - Column('Species', Unicode(100)), - Column('Legs', Integer, default=4), - Column('LastEscape', DateTime), - Column('Lifespan', Float(4)), - Column('MotherID', Integer, ForeignKey('Animal.ID')), - Column('PreferredFoodID', Integer), - Column('AlternateFoodID', Integer), - ) + Zoo = Table( + 'Zoo', + metadata, + Column('ID', Integer, Sequence('zoo_id_seq'), + primary_key=True, index=True), + Column('Name', Unicode(255)), + Column('Founded', Date), + Column('Opens', Time), + Column('LastEscape', DateTime), + Column('Admission', Float), + ) + Animal = Table( + 'Animal', + metadata, + Column('ID', Integer, Sequence('animal_id_seq'), + primary_key=True), + Column('ZooID', Integer, ForeignKey('Zoo.ID'), index=True), + Column('Name', Unicode(100)), + Column('Species', Unicode(100)), + Column('Legs', Integer, default=4), + Column('LastEscape', DateTime), + Column('Lifespan', Float(4)), + Column('MotherID', Integer, ForeignKey('Animal.ID')), + Column('PreferredFoodID', Integer), + Column('AlternateFoodID', Integer), + ) metadata.create_all() def test_baseline_1a_populate(self): Zoo = metadata.tables['Zoo'] Animal = metadata.tables['Animal'] - wap = Zoo.insert().execute(Name=u'Wild Animal Park', - Founded=datetime.date(2000, 1, 1), - # 59 can give rounding errors with divmod, which - # AdapterFromADO needs to correct. - Opens=datetime.time(8, 15, 59), - LastEscape=datetime.datetime(2004, 7, 29, 5, 6, 7), - Admission=4.95, - ).inserted_primary_key[0] - - sdz = Zoo.insert().execute(Name =u'San Diego Zoo', - Founded = datetime.date(1935, 9, 13), - Opens = datetime.time(9, 0, 0), - Admission = 0, - ).inserted_primary_key[0] - - Zoo.insert(inline=True).execute( - Name = u'Montr\xe9al Biod\xf4me', - Founded = datetime.date(1992, 6, 19), - Opens = datetime.time(9, 0, 0), - Admission = 11.75, - ) - - seaworld = Zoo.insert().execute( - Name =u'Sea_World', Admission = 60).inserted_primary_key[0] + Founded=datetime.date(2000, 1, 1), + Opens=datetime.time(8, 15, 59), + LastEscape= + datetime.datetime(2004, 7, 29, 5, 6, 7), + Admission=4.95).inserted_primary_key[0] + sdz = Zoo.insert().execute(Name=u'San Diego Zoo', + Founded=datetime.date(1935, 9, 13), + Opens=datetime.time(9, 0, 0), + Admission=0).inserted_primary_key[0] + Zoo.insert(inline=True).execute(Name=u'Montr\xe9al Biod\xf4me', + Founded=datetime.date(1992, 6, 19), + Opens=datetime.time(9, 0, 0), Admission=11.75) + seaworld = Zoo.insert().execute(Name=u'Sea_World', + Admission=60).inserted_primary_key[0] # Let's add a crazy futuristic Zoo to test large date values. - lp = Zoo.insert().execute(Name =u'Luna Park', - Founded = datetime.date(2072, 7, 17), - Opens = datetime.time(0, 0, 0), - Admission = 134.95, - ).inserted_primary_key[0] + + lp = Zoo.insert().execute(Name=u'Luna Park', + Founded=datetime.date(2072, 7, 17), + Opens=datetime.time(0, 0, 0), + Admission=134.95).inserted_primary_key[0] # Animals - leopardid = Animal.insert().execute(Species=u'Leopard', Lifespan=73.5, - ).inserted_primary_key[0] - Animal.update(Animal.c.ID==leopardid).execute(ZooID=wap, - LastEscape=datetime.datetime(2004, 12, 21, 8, 15, 0, 999907)) - lion = Animal.insert().execute(Species=u'Lion', ZooID=wap).inserted_primary_key[0] + leopardid = Animal.insert().execute(Species=u'Leopard', + Lifespan=73.5).inserted_primary_key[0] + Animal.update(Animal.c.ID == leopardid).execute(ZooID=wap, + LastEscape=datetime.datetime( 2004, 12, 21, 8, 15, 0, 999907,) + ) + lion = Animal.insert().execute(Species=u'Lion', + ZooID=wap).inserted_primary_key[0] Animal.insert().execute(Species=u'Slug', Legs=1, Lifespan=.75) - - tiger = Animal.insert().execute(Species=u'Tiger', ZooID=sdz - ).inserted_primary_key[0] + tiger = Animal.insert().execute(Species=u'Tiger', + ZooID=sdz).inserted_primary_key[0] # Override Legs.default with itself just to make sure it works. - Animal.insert(inline=True).execute(Species=u'Bear', Legs=4) - Animal.insert(inline=True).execute(Species=u'Ostrich', Legs=2, Lifespan=103.2) - Animal.insert(inline=True).execute(Species=u'Centipede', Legs=100) - - emp = Animal.insert().execute(Species=u'Emperor Penguin', Legs=2, - ZooID=seaworld).inserted_primary_key[0] - adelie = Animal.insert().execute(Species=u'Adelie Penguin', Legs=2, - ZooID=seaworld).inserted_primary_key[0] - Animal.insert(inline=True).execute(Species=u'Millipede', Legs=1000000, ZooID=sdz) + Animal.insert(inline=True).execute(Species=u'Bear', Legs=4) + Animal.insert(inline=True).execute(Species=u'Ostrich', Legs=2, + Lifespan=103.2) + Animal.insert(inline=True).execute(Species=u'Centipede', + Legs=100) + emp = Animal.insert().execute(Species=u'Emperor Penguin', + Legs=2, ZooID=seaworld).inserted_primary_key[0] + adelie = Animal.insert().execute(Species=u'Adelie Penguin', + Legs=2, ZooID=seaworld).inserted_primary_key[0] + Animal.insert(inline=True).execute(Species=u'Millipede', + Legs=1000000, ZooID=sdz) # Add a mother and child to test relationships - bai_yun = Animal.insert().execute(Species=u'Ape', Name=u'Bai Yun', - Legs=2).inserted_primary_key[0] - Animal.insert(inline=True).execute(Species=u'Ape', Name=u'Hua Mei', Legs=2, - MotherID=bai_yun) + + bai_yun = Animal.insert().execute(Species=u'Ape', + Name=u'Bai Yun', Legs=2).inserted_primary_key[0] + Animal.insert(inline=True).execute(Species=u'Ape', + Name=u'Hua Mei', Legs=2, MotherID=bai_yun) def test_baseline_2_insert(self): Animal = metadata.tables['Animal'] i = Animal.insert(inline=True) for x in xrange(ITERATIONS): - tick = i.execute(Species=u'Tick', Name=u'Tick %d' % x, Legs=8) + tick = i.execute(Species=u'Tick', Name=u'Tick %d' % x, + Legs=8) def test_baseline_3_properties(self): Zoo = metadata.tables['Zoo'] @@ -145,20 +139,32 @@ class ZooMarkTest(TestBase): def fullobject(select): """Iterate over the full result row.""" + return list(select.execute().first()) for x in xrange(ITERATIONS): + # Zoos - WAP = fullobject(Zoo.select(Zoo.c.Name==u'Wild Animal Park')) - SDZ = fullobject(Zoo.select(Zoo.c.Founded==datetime.date(1935, 9, 13))) - Biodome = fullobject(Zoo.select(Zoo.c.Name==u'Montr\xe9al Biod\xf4me')) - seaworld = fullobject(Zoo.select(Zoo.c.Admission == float(60))) + + WAP = fullobject(Zoo.select(Zoo.c.Name + == u'Wild Animal Park')) + SDZ = fullobject(Zoo.select(Zoo.c.Founded + == datetime.date(1935, 9, 13))) + Biodome = fullobject(Zoo.select(Zoo.c.Name + == u'Montr\xe9al Biod\xf4me')) + seaworld = fullobject(Zoo.select(Zoo.c.Admission + == float(60))) # Animals - leopard = fullobject(Animal.select(Animal.c.Species ==u'Leopard')) - ostrich = fullobject(Animal.select(Animal.c.Species==u'Ostrich')) - millipede = fullobject(Animal.select(Animal.c.Legs==1000000)) - ticks = fullobject(Animal.select(Animal.c.Species==u'Tick')) + + leopard = fullobject(Animal.select(Animal.c.Species + == u'Leopard')) + ostrich = fullobject(Animal.select(Animal.c.Species + == u'Ostrich')) + millipede = fullobject(Animal.select(Animal.c.Legs + == 1000000)) + ticks = fullobject(Animal.select(Animal.c.Species == u'Tick' + )) def test_baseline_4_expressions(self): Zoo = metadata.tables['Zoo'] @@ -166,117 +172,147 @@ class ZooMarkTest(TestBase): def fulltable(select): """Iterate over the full result table.""" + return [list(row) for row in select.execute().fetchall()] for x in xrange(ITERATIONS): assert len(fulltable(Zoo.select())) == 5 assert len(fulltable(Animal.select())) == ITERATIONS + 12 - assert len(fulltable(Animal.select(Animal.c.Legs==4))) == 4 - assert len(fulltable(Animal.select(Animal.c.Legs == 2))) == 5 - assert len(fulltable(Animal.select(and_(Animal.c.Legs >= 2, Animal.c.Legs < 20) - ))) == ITERATIONS + 9 - assert len(fulltable(Animal.select(Animal.c.Legs > 10))) == 2 - assert len(fulltable(Animal.select(Animal.c.Lifespan > 70))) == 2 - assert len(fulltable(Animal.select(Animal.c.Species.startswith(u'L')))) == 2 - assert len(fulltable(Animal.select(Animal.c.Species.endswith(u'pede')))) == 2 - - assert len(fulltable(Animal.select(Animal.c.LastEscape != None))) == 1 - assert len(fulltable(Animal.select(None == Animal.c.LastEscape - ))) == ITERATIONS + 11 + assert len(fulltable(Animal.select(Animal.c.Legs == 4))) \ + == 4 + assert len(fulltable(Animal.select(Animal.c.Legs == 2))) \ + == 5 + assert len(fulltable(Animal.select(and_(Animal.c.Legs >= 2, + Animal.c.Legs < 20)))) == ITERATIONS + 9 + assert len(fulltable(Animal.select(Animal.c.Legs > 10))) \ + == 2 + assert len(fulltable(Animal.select(Animal.c.Lifespan + > 70))) == 2 + assert len(fulltable(Animal.select(Animal.c.Species. + startswith(u'L')))) == 2 + assert len(fulltable(Animal.select(Animal.c.Species. + endswith(u'pede')))) == 2 + assert len(fulltable(Animal.select(Animal.c.LastEscape + != None))) == 1 + assert len(fulltable(Animal.select(None + == Animal.c.LastEscape))) == ITERATIONS + 11 # In operator (containedby) - assert len(fulltable(Animal.select(Animal.c.Species.like(u'%pede%')))) == 2 - assert len(fulltable(Animal.select(Animal.c.Species.in_([u'Lion', u'Tiger', u'Bear'])))) == 3 + + assert len(fulltable(Animal.select(Animal.c.Species.like(u'%pede%' + )))) == 2 + assert len(fulltable(Animal.select(Animal.c.Species.in_([u'Lion' + , u'Tiger', u'Bear'])))) == 3 # Try In with cell references - class thing(object): pass + class thing(object): + pass + + pet, pet2 = thing(), thing() - pet.Name, pet2.Name =u'Slug', u'Ostrich' - assert len(fulltable(Animal.select(Animal.c.Species.in_([pet.Name, pet2.Name])))) == 2 + pet.Name, pet2.Name = u'Slug', u'Ostrich' + assert len(fulltable(Animal.select(Animal.c.Species.in_([pet.Name, + pet2.Name])))) == 2 # logic and other functions - assert len(fulltable(Animal.select(Animal.c.Species.like(u'Slug')))) == 1 - assert len(fulltable(Animal.select(Animal.c.Species.like(u'%pede%')))) == 2 - name =u'Lion' - assert len(fulltable(Animal.select(func.length(Animal.c.Species) == len(name) - ))) == ITERATIONS + 3 - assert len(fulltable(Animal.select(Animal.c.Species.like(u'%i%') - ))) == ITERATIONS + 7 + assert len(fulltable(Animal.select(Animal.c.Species.like(u'Slug' + )))) == 1 + assert len(fulltable(Animal.select(Animal.c.Species.like(u'%pede%' + )))) == 2 + name = u'Lion' + assert len(fulltable(Animal.select(func.length(Animal.c.Species) + == len(name)))) == ITERATIONS + 3 + assert len(fulltable(Animal.select(Animal.c.Species.like(u'%i%' + )))) == ITERATIONS + 7 # Test now(), today(), year(), month(), day() + assert len(fulltable(Zoo.select(and_(Zoo.c.Founded != None, - Zoo.c.Founded < func.current_timestamp(_type=Date))))) == 3 - assert len(fulltable(Animal.select(Animal.c.LastEscape == func.current_timestamp(_type=Date)))) == 0 - assert len(fulltable(Animal.select(func.date_part('year', Animal.c.LastEscape) == 2004))) == 1 - assert len(fulltable(Animal.select(func.date_part('month', Animal.c.LastEscape) == 12))) == 1 - assert len(fulltable(Animal.select(func.date_part('day', Animal.c.LastEscape) == 21))) == 1 + Zoo.c.Founded + < func.current_timestamp(_type=Date))))) == 3 + assert len(fulltable(Animal.select(Animal.c.LastEscape + == func.current_timestamp(_type=Date)))) == 0 + assert len(fulltable(Animal.select(func.date_part('year', + Animal.c.LastEscape) == 2004))) == 1 + assert len(fulltable(Animal.select(func.date_part('month', + Animal.c.LastEscape) == 12))) == 1 + assert len(fulltable(Animal.select(func.date_part('day', + Animal.c.LastEscape) == 21))) == 1 def test_baseline_5_aggregates(self): Animal = metadata.tables['Animal'] Zoo = metadata.tables['Zoo'] - for x in xrange(ITERATIONS): + # views + view = select([Animal.c.Legs]).execute().fetchall() legs = [x[0] for x in view] legs.sort() - - expected = {'Leopard': 73.5, - 'Slug': .75, - 'Tiger': None, - 'Lion': None, - 'Bear': None, - 'Ostrich': 103.2, - 'Centipede': None, - 'Emperor Penguin': None, - 'Adelie Penguin': None, - 'Millipede': None, - 'Ape': None, - 'Tick': None, - } - for species, lifespan in select([Animal.c.Species, Animal.c.Lifespan] - ).execute().fetchall(): + expected = { + 'Leopard': 73.5, + 'Slug': .75, + 'Tiger': None, + 'Lion': None, + 'Bear': None, + 'Ostrich': 103.2, + 'Centipede': None, + 'Emperor Penguin': None, + 'Adelie Penguin': None, + 'Millipede': None, + 'Ape': None, + 'Tick': None, + } + for species, lifespan in select([Animal.c.Species, + Animal.c.Lifespan]).execute().fetchall(): assert lifespan == expected[species] - expected = [u'Montr\xe9al Biod\xf4me', 'Wild Animal Park'] - e = select([Zoo.c.Name], - and_(Zoo.c.Founded != None, - Zoo.c.Founded <= func.current_timestamp(), - Zoo.c.Founded >= datetime.date(1990, 1, 1))) + e = select([Zoo.c.Name], and_(Zoo.c.Founded != None, + Zoo.c.Founded <= func.current_timestamp(), + Zoo.c.Founded >= datetime.date(1990, 1, 1))) values = [val[0] for val in e.execute().fetchall()] assert set(values) == set(expected) # distinct - legs = [x[0] for x in - select([Animal.c.Legs], distinct=True).execute().fetchall()] + + legs = [x[0] for x in select([Animal.c.Legs], + distinct=True).execute().fetchall()] legs.sort() def test_baseline_6_editing(self): Zoo = metadata.tables['Zoo'] - for x in xrange(ITERATIONS): + # Edit - SDZ = Zoo.select(Zoo.c.Name==u'San Diego Zoo').execute().first() - Zoo.update(Zoo.c.ID==SDZ['ID']).execute( - Name=u'The San Diego Zoo', - Founded = datetime.date(1900, 1, 1), - Opens = datetime.time(7, 30, 0), - Admission = "35.00") + + SDZ = Zoo.select(Zoo.c.Name == u'San Diego Zoo' + ).execute().first() + Zoo.update(Zoo.c.ID == SDZ['ID' + ]).execute(Name=u'The San Diego Zoo', + Founded=datetime.date(1900, 1, 1), + Opens=datetime.time(7, 30, 0), + Admission='35.00') # Test edits - SDZ = Zoo.select(Zoo.c.Name==u'The San Diego Zoo').execute().first() - assert SDZ['Founded'] == datetime.date(1900, 1, 1), SDZ['Founded'] + + SDZ = Zoo.select(Zoo.c.Name == u'The San Diego Zoo' + ).execute().first() + assert SDZ['Founded'] == datetime.date(1900, 1, 1), \ + SDZ['Founded'] # Change it back - Zoo.update(Zoo.c.ID==SDZ['ID']).execute( - Name =u'San Diego Zoo', - Founded = datetime.date(1935, 9, 13), - Opens = datetime.time(9, 0, 0), - Admission = "0") + + Zoo.update(Zoo.c.ID == SDZ['ID' + ]).execute(Name=u'San Diego Zoo', + Founded=datetime.date(1935, 9, 13), + Opens=datetime.time(9, 0, 0), + Admission='0') # Test re-edits - SDZ = Zoo.select(Zoo.c.Name==u'San Diego Zoo').execute().first() + + SDZ = Zoo.select(Zoo.c.Name == u'San Diego Zoo' + ).execute().first() assert SDZ['Founded'] == datetime.date(1935, 9, 13) def test_baseline_7_multiview(self): @@ -285,44 +321,43 @@ class ZooMarkTest(TestBase): def fulltable(select): """Iterate over the full result table.""" + return [list(row) for row in select.execute().fetchall()] for x in xrange(ITERATIONS): za = fulltable(select([Zoo.c.ID] + list(Animal.c), - Zoo.c.Name ==u'San Diego Zoo', - from_obj = [join(Zoo, Animal)])) - - SDZ = Zoo.select(Zoo.c.Name==u'San Diego Zoo') - + Zoo.c.Name == u'San Diego Zoo', + from_obj=[join(Zoo, Animal)])) + SDZ = Zoo.select(Zoo.c.Name == u'San Diego Zoo') e = fulltable(select([Zoo.c.ID, Animal.c.ID], - and_(Zoo.c.Name==u'San Diego Zoo', - Animal.c.Species==u'Leopard'), - from_obj = [join(Zoo, Animal)])) + and_(Zoo.c.Name == u'San Diego Zoo', + Animal.c.Species == u'Leopard'), + from_obj=[join(Zoo, Animal)])) # Now try the same query with INNER, LEFT, and RIGHT JOINs. + e = fulltable(select([Zoo.c.Name, Animal.c.Species], - from_obj=[join(Zoo, Animal)])) + from_obj=[join(Zoo, Animal)])) e = fulltable(select([Zoo.c.Name, Animal.c.Species], - from_obj=[outerjoin(Zoo, Animal)])) + from_obj=[outerjoin(Zoo, Animal)])) e = fulltable(select([Zoo.c.Name, Animal.c.Species], - from_obj=[outerjoin(Animal, Zoo)])) + from_obj=[outerjoin(Animal, Zoo)])) def test_baseline_8_drop(self): metadata.drop_all() - # Now, run all of these tests again with the DB-API driver factored out: - # the ReplayableSession playback stands in for the database. - + # Now, run all of these tests again with the DB-API driver factored + # out: the ReplayableSession playback stands in for the database. + # # How awkward is this in a unittest framework? Very. def test_profile_0(self): global metadata - - player = lambda: dbapi_session.player() + player = lambda : dbapi_session.player() engine = create_engine('postgresql:///', creator=player) metadata = MetaData(engine) engine.connect() - + @profiling.function_call_count(3012, {'2.4': 1827}) def test_profile_1_create_tables(self): self.test_baseline_1_create_tables() @@ -339,17 +374,15 @@ class ZooMarkTest(TestBase): def test_profile_3_properties(self): self.test_baseline_3_properties() - @profiling.function_call_count(13341, {'2.4': 7963, - '2.6+cextension':12447, - '2.7+cextension':12447}, - variance=0.10) + @profiling.function_call_count(13341, {'2.4': 7963, '2.6+cextension' + : 12447, '2.7+cextension': 12447}, + variance=0.10) def test_profile_4_expressions(self): self.test_baseline_4_expressions() - @profiling.function_call_count(1311, {'2.4': 904, - '2.6+cextension':1226, - '2.7+cextension':1226, - }, variance=0.10) + @profiling.function_call_count(1311, {'2.4': 904, '2.6+cextension' + : 1226, '2.7+cextension': 1226}, + variance=0.10) def test_profile_5_aggregates(self): self.test_baseline_5_aggregates() @@ -357,11 +390,10 @@ class ZooMarkTest(TestBase): def test_profile_6_editing(self): self.test_baseline_6_editing() - @profiling.function_call_count(2641, {'2.4': 1673, '2.6+cextension':2502}) + @profiling.function_call_count(2641, {'2.4': 1673, '2.6+cextension' + : 2502}) def test_profile_7_multiview(self): self.test_baseline_7_multiview() def test_profile_8_drop(self): self.test_baseline_8_drop() - - diff --git a/test/aaa_profiling/test_zoomark_orm.py b/test/aaa_profiling/test_zoomark_orm.py index 9884f7fb4f..0b699eeade 100644 --- a/test/aaa_profiling/test_zoomark_orm.py +++ b/test/aaa_profiling/test_zoomark_orm.py @@ -1,7 +1,7 @@ """Benchmark for SQLAlchemy. -An adaptation of Robert Brewers' ZooMark speed tests. -""" +An adaptation of Robert Brewers' ZooMark speed tests. """ + import datetime import sys @@ -9,284 +9,328 @@ import time from sqlalchemy import * from sqlalchemy.orm import * from sqlalchemy.test import * - ITERATIONS = 1 - dbapi_session = engines.ReplayableSession() metadata = None -class ZooMarkTest(TestBase): - """Runs the ZooMark and squawks if method counts vary from the norm. - Each test has an associated `call_range`, the total number of accepted - function calls made during the test. The count can vary between Python - 2.4 and 2.5. +class ZooMarkTest(TestBase): + """Runs the ZooMark and squawks if method counts vary from the norm. + + Each test has an associated `call_range`, the total number of + accepted function calls made during the test. The count can vary + between Python 2.4 and 2.5. + Unlike a unit test, this is a ordered collection of steps. Running components individually will fail. - + """ __only_on__ = 'postgresql+psycopg2' - __skip_if__ = ((lambda: sys.version_info < (2, 5)), ) # TODO: get 2.4 support + __skip_if__ = lambda : sys.version_info < (2, 5), # TODO: get 2.4 + # support def test_baseline_0_setup(self): global metadata, session - creator = testing.db.pool._creator - recorder = lambda: dbapi_session.recorder(creator()) - engine = engines.testing_engine(options={'creator':recorder}) + recorder = lambda : dbapi_session.recorder(creator()) + engine = engines.testing_engine(options={'creator': recorder}) metadata = MetaData(engine) session = sessionmaker()() engine.connect() - + def test_baseline_1_create_tables(self): - zoo = Table('Zoo', metadata, - Column('ID', Integer, Sequence('zoo_id_seq'), - primary_key=True, index=True), - Column('Name', Unicode(255)), - Column('Founded', Date), - Column('Opens', Time), - Column('LastEscape', DateTime), - Column('Admission', Float), - ) - - animal = Table('Animal', metadata, - Column('ID', Integer, Sequence('animal_id_seq'), - primary_key=True), - Column('ZooID', Integer, ForeignKey('Zoo.ID'), - index=True), - Column('Name', Unicode(100)), - Column('Species', Unicode(100)), - Column('Legs', Integer, default=4), - Column('LastEscape', DateTime), - Column('Lifespan', Float(4)), - Column('MotherID', Integer, ForeignKey('Animal.ID')), - Column('PreferredFoodID', Integer), - Column('AlternateFoodID', Integer), - ) + zoo = Table( + 'Zoo', + metadata, + Column('ID', Integer, Sequence('zoo_id_seq'), + primary_key=True, index=True), + Column('Name', Unicode(255)), + Column('Founded', Date), + Column('Opens', Time), + Column('LastEscape', DateTime), + Column('Admission', Float), + ) + animal = Table( + 'Animal', + metadata, + Column('ID', Integer, Sequence('animal_id_seq'), + primary_key=True), + Column('ZooID', Integer, ForeignKey('Zoo.ID'), index=True), + Column('Name', Unicode(100)), + Column('Species', Unicode(100)), + Column('Legs', Integer, default=4), + Column('LastEscape', DateTime), + Column('Lifespan', Float(4)), + Column('MotherID', Integer, ForeignKey('Animal.ID')), + Column('PreferredFoodID', Integer), + Column('AlternateFoodID', Integer), + ) metadata.create_all() - global Zoo, Animal + + class Zoo(object): + def __init__(self, **kwargs): for k, v in kwargs.iteritems(): setattr(self, k, v) + + class Animal(object): + def __init__(self, **kwargs): for k, v in kwargs.iteritems(): setattr(self, k, v) + + mapper(Zoo, zoo) mapper(Animal, animal) - + def test_baseline_1a_populate(self): - wap = Zoo(Name=u'Wild Animal Park', - Founded=datetime.date(2000, 1, 1), - # 59 can give rounding errors with divmod, which - # AdapterFromADO needs to correct. - Opens=datetime.time(8, 15, 59), - LastEscape=datetime.datetime(2004, 7, 29, 5, 6, 7), - Admission=4.95, - ) + wap = Zoo(Name=u'Wild Animal Park', Founded=datetime.date(2000, + 1, 1), Opens=datetime.time(8, 15, 59), + LastEscape=datetime.datetime( 2004, 7, 29, 5, 6, 7, ), + Admission=4.95) session.add(wap) - sdz = Zoo(Name =u'San Diego Zoo', - # This early date should play havoc with a number - # of implementations. - Founded = datetime.date(1835, 9, 13), - Opens = datetime.time(9, 0, 0), - Admission = 0, - ) + sdz = Zoo(Name=u'San Diego Zoo', Founded=datetime.date(1835, 9, + 13), Opens=datetime.time(9, 0, 0), Admission=0) session.add(sdz) - - bio = Zoo( - Name = u'Montr\xe9al Biod\xf4me', - Founded = datetime.date(1992, 6, 19), - Opens = datetime.time(9, 0, 0), - Admission = 11.75, - ) + bio = Zoo(Name=u'Montr\xe9al Biod\xf4me', + Founded=datetime.date(1992, 6, 19), + Opens=datetime.time(9, 0, 0), Admission=11.75) session.add(bio) - - seaworld = Zoo( - Name =u'Sea_World', Admission = 60) + seaworld = Zoo(Name=u'Sea_World', Admission=60) session.add(seaworld) - + # Let's add a crazy futuristic Zoo to test large date values. - lp = Zoo(Name =u'Luna Park', - Founded = datetime.date(2072, 7, 17), - Opens = datetime.time(0, 0, 0), - Admission = 134.95, - ) + + lp = Zoo(Name=u'Luna Park', Founded=datetime.date(2072, 7, 17), + Opens=datetime.time(0, 0, 0), Admission=134.95) session.add(lp) session.flush() - + # Animals - leopard = Animal(Species=u'Leopard', Lifespan=73.5,) + + leopard = Animal(Species=u'Leopard', Lifespan=73.5) session.add(leopard) leopard.ZooID = wap.ID - leopard.LastEscape = datetime.datetime(2004, 12, 21, 8, 15, 0, 999907) - + leopard.LastEscape = \ + datetime.datetime(2004, 12, 21, 8, 15, 0, 999907, ) session.add(Animal(Species=u'Lion', ZooID=wap.ID)) session.add(Animal(Species=u'Slug', Legs=1, Lifespan=.75)) session.add(Animal(Species=u'Tiger', ZooID=sdz.ID)) - + # Override Legs.default with itself just to make sure it works. + session.add(Animal(Species=u'Bear', Legs=4)) session.add(Animal(Species=u'Ostrich', Legs=2, Lifespan=103.2)) session.add(Animal(Species=u'Centipede', Legs=100)) - - session.add(Animal(Species=u'Emperor Penguin', Legs=2, ZooID=seaworld.ID)) - session.add(Animal(Species=u'Adelie Penguin', Legs=2, ZooID=seaworld.ID)) - - session.add(Animal(Species=u'Millipede', Legs=1000000, ZooID=sdz.ID)) - + session.add(Animal(Species=u'Emperor Penguin', Legs=2, + ZooID=seaworld.ID)) + session.add(Animal(Species=u'Adelie Penguin', Legs=2, + ZooID=seaworld.ID)) + session.add(Animal(Species=u'Millipede', Legs=1000000, + ZooID=sdz.ID)) + # Add a mother and child to test relationships + bai_yun = Animal(Species=u'Ape', Nameu=u'Bai Yun', Legs=2) session.add(bai_yun) session.add(Animal(Species=u'Ape', Name=u'Hua Mei', Legs=2, - MotherID=bai_yun.ID)) + MotherID=bai_yun.ID)) session.flush() session.commit() def test_baseline_2_insert(self): for x in xrange(ITERATIONS): - session.add(Animal(Species=u'Tick', Name=u'Tick %d' % x, Legs=8)) + session.add(Animal(Species=u'Tick', Name=u'Tick %d' % x, + Legs=8)) session.flush() def test_baseline_3_properties(self): for x in xrange(ITERATIONS): + # Zoos - WAP = list(session.query(Zoo).filter(Zoo.Name==u'Wild Animal Park')) - SDZ = list(session.query(Zoo).filter(Zoo.Founded==datetime.date(1835, 9, 13))) - Biodome = list(session.query(Zoo).filter(Zoo.Name==u'Montr\xe9al Biod\xf4me')) - seaworld = list(session.query(Zoo).filter(Zoo.Admission == float(60))) - + + WAP = list(session.query(Zoo).filter(Zoo.Name + == u'Wild Animal Park')) + SDZ = list(session.query(Zoo).filter(Zoo.Founded + == datetime.date(1835, 9, 13))) + Biodome = list(session.query(Zoo).filter(Zoo.Name + == u'Montr\xe9al Biod\xf4me')) + seaworld = list(session.query(Zoo).filter(Zoo.Admission + == float(60))) + # Animals - leopard = list(session.query(Animal).filter(Animal.Species == u'Leopard')) - ostrich = list(session.query(Animal).filter(Animal.Species==u'Ostrich')) - millipede = list(session.query(Animal).filter(Animal.Legs==1000000)) - ticks = list(session.query(Animal).filter(Animal.Species==u'Tick')) + + leopard = list(session.query(Animal).filter(Animal.Species + == u'Leopard')) + ostrich = list(session.query(Animal).filter(Animal.Species + == u'Ostrich')) + millipede = list(session.query(Animal).filter(Animal.Legs + == 1000000)) + ticks = list(session.query(Animal).filter(Animal.Species + == u'Tick')) def test_baseline_4_expressions(self): for x in xrange(ITERATIONS): assert len(list(session.query(Zoo))) == 5 assert len(list(session.query(Animal))) == ITERATIONS + 12 - assert len(list(session.query(Animal).filter(Animal.Legs==4))) == 4 - assert len(list(session.query(Animal).filter(Animal.Legs == 2))) == 5 - assert len(list(session.query(Animal).filter(and_(Animal.Legs >= 2, Animal.Legs < 20)))) == ITERATIONS + 9 - assert len(list(session.query(Animal).filter(Animal.Legs > 10))) == 2 - assert len(list(session.query(Animal).filter(Animal.Lifespan > 70))) == 2 - assert len(list(session.query(Animal).filter(Animal.Species.like(u'L%')))) == 2 - assert len(list(session.query(Animal).filter(Animal.Species.like(u'%pede')))) == 2 - - assert len(list(session.query(Animal).filter(Animal.LastEscape != None))) == 1 - assert len(list(session.query(Animal).filter(Animal.LastEscape == None))) == ITERATIONS + 11 + assert len(list(session.query(Animal).filter(Animal.Legs + == 4))) == 4 + assert len(list(session.query(Animal).filter(Animal.Legs + == 2))) == 5 + assert len(list(session.query(Animal).filter(and_(Animal.Legs + >= 2, Animal.Legs < 20)))) == ITERATIONS + 9 + assert len(list(session.query(Animal).filter(Animal.Legs + > 10))) == 2 + assert len(list(session.query(Animal).filter(Animal.Lifespan + > 70))) == 2 + assert len(list(session.query(Animal). + filter(Animal.Species.like(u'L%')))) == 2 + assert len(list(session.query(Animal). + filter(Animal.Species.like(u'%pede')))) == 2 + assert len(list(session.query(Animal).filter(Animal.LastEscape + != None))) == 1 + assert len(list(session.query(Animal).filter(Animal.LastEscape + == None))) == ITERATIONS + 11 # In operator (containedby) - assert len(list(session.query(Animal).filter(Animal.Species.like(u'%pede%')))) == 2 + assert len(list(session.query(Animal).filter( - Animal.Species.in_((u'Lion', u'Tiger', u'Bear'))))) == 3 + Animal.Species.like(u'%pede%')))) == 2 + assert len(list(session.query(Animal). + filter(Animal.Species.in_((u'Lion' + , u'Tiger', u'Bear'))))) == 3 # Try In with cell references - class thing(object): pass + class thing(object): + pass + pet, pet2 = thing(), thing() pet.Name, pet2.Name = u'Slug', u'Ostrich' - assert len(list(session.query(Animal).filter(Animal.Species.in_((pet.Name, pet2.Name))))) == 2 + assert len(list(session.query(Animal). + filter(Animal.Species.in_((pet.Name, + pet2.Name))))) == 2 # logic and other functions - name =u'Lion' - assert len(list(session.query(Animal).filter(func.length(Animal.Species) == len(name)))) == ITERATIONS + 3 - assert len(list(session.query(Animal).filter(Animal.Species.like(u'%i%')))) == ITERATIONS + 7 + name = u'Lion' + assert len(list(session.query(Animal). + filter(func.length(Animal.Species) + == len(name)))) == ITERATIONS + 3 + assert len(list(session.query(Animal). + filter(Animal.Species.like(u'%i%' + )))) == ITERATIONS + 7 # Test now(), today(), year(), month(), day() - assert len(list(session.query(Zoo).filter(and_(Zoo.Founded != None, Zoo.Founded < func.now())))) == 3 - assert len(list(session.query(Animal).filter(Animal.LastEscape == func.now()))) == 0 - assert len(list(session.query(Animal).filter(func.date_part('year', Animal.LastEscape) == 2004))) == 1 - assert len(list(session.query(Animal).filter(func.date_part('month', Animal.LastEscape) == 12))) == 1 - assert len(list(session.query(Animal).filter(func.date_part('day', Animal.LastEscape) == 21))) == 1 + + assert len(list(session.query(Zoo).filter(and_(Zoo.Founded + != None, Zoo.Founded < func.now())))) == 3 + assert len(list(session.query(Animal).filter(Animal.LastEscape + == func.now()))) == 0 + assert len(list(session.query(Animal).filter(func.date_part('year' + , Animal.LastEscape) == 2004))) == 1 + assert len(list(session.query(Animal). + filter(func.date_part('month' + , Animal.LastEscape) == 12))) == 1 + assert len(list(session.query(Animal).filter(func.date_part('day' + , Animal.LastEscape) == 21))) == 1 def test_baseline_5_aggregates(self): Animal = metadata.tables['Animal'] Zoo = metadata.tables['Zoo'] - + # TODO: convert to ORM + for x in xrange(ITERATIONS): + # views + view = select([Animal.c.Legs]).execute().fetchall() legs = [x[0] for x in view] legs.sort() - - expected = {'Leopard': 73.5, - 'Slug': .75, - 'Tiger': None, - 'Lion': None, - 'Bear': None, - 'Ostrich': 103.2, - 'Centipede': None, - 'Emperor Penguin': None, - 'Adelie Penguin': None, - 'Millipede': None, - 'Ape': None, - 'Tick': None, - } - for species, lifespan in select([Animal.c.Species, Animal.c.Lifespan] - ).execute().fetchall(): + expected = { + 'Leopard': 73.5, + 'Slug': .75, + 'Tiger': None, + 'Lion': None, + 'Bear': None, + 'Ostrich': 103.2, + 'Centipede': None, + 'Emperor Penguin': None, + 'Adelie Penguin': None, + 'Millipede': None, + 'Ape': None, + 'Tick': None, + } + for species, lifespan in select([Animal.c.Species, + Animal.c.Lifespan]).execute().fetchall(): assert lifespan == expected[species] - expected = [u'Montr\xe9al Biod\xf4me', 'Wild Animal Park'] - e = select([Zoo.c.Name], - and_(Zoo.c.Founded != None, - Zoo.c.Founded <= func.current_timestamp(), - Zoo.c.Founded >= datetime.date(1990, 1, 1))) + e = select([Zoo.c.Name], and_(Zoo.c.Founded != None, + Zoo.c.Founded <= func.current_timestamp(), + Zoo.c.Founded >= datetime.date(1990, 1, 1))) values = [val[0] for val in e.execute().fetchall()] assert set(values) == set(expected) # distinct - legs = [x[0] for x in - select([Animal.c.Legs], distinct=True).execute().fetchall()] + + legs = [x[0] for x in select([Animal.c.Legs], + distinct=True).execute().fetchall()] legs.sort() def test_baseline_6_editing(self): for x in xrange(ITERATIONS): + # Edit - SDZ = session.query(Zoo).filter(Zoo.Name==u'San Diego Zoo').one() + + SDZ = session.query(Zoo).filter(Zoo.Name == u'San Diego Zoo' + ).one() SDZ.Name = u'The San Diego Zoo' SDZ.Founded = datetime.date(1900, 1, 1) SDZ.Opens = datetime.time(7, 30, 0) SDZ.Admission = 35.00 - + # Test edits - SDZ = session.query(Zoo).filter(Zoo.Name==u'The San Diego Zoo').one() + + SDZ = session.query(Zoo).filter(Zoo.Name + == u'The San Diego Zoo').one() assert SDZ.Founded == datetime.date(1900, 1, 1), SDZ.Founded - + # Change it back + SDZ.Name = u'San Diego Zoo' SDZ.Founded = datetime.date(1835, 9, 13) SDZ.Opens = datetime.time(9, 0, 0) SDZ.Admission = 0 - + # Test re-edits - SDZ = session.query(Zoo).filter(Zoo.Name==u'San Diego Zoo').one() - assert SDZ.Founded == datetime.date(1835, 9, 13), SDZ.Founded + + SDZ = session.query(Zoo).filter(Zoo.Name == u'San Diego Zoo' + ).one() + assert SDZ.Founded == datetime.date(1835, 9, 13), \ + SDZ.Founded def test_baseline_7_drop(self): session.rollback() metadata.drop_all() - # Now, run all of these tests again with the DB-API driver factored out: - # the ReplayableSession playback stands in for the database. - + # Now, run all of these tests again with the DB-API driver factored + # out: the ReplayableSession playback stands in for the database. + # # How awkward is this in a unittest framework? Very. def test_profile_0(self): global metadata, session - - player = lambda: dbapi_session.player() + player = lambda : dbapi_session.player() engine = create_engine('postgresql:///', creator=player) metadata = MetaData(engine) session = sessionmaker()() engine.connect() - + @profiling.function_call_count(4898) def test_profile_1_create_tables(self): self.test_baseline_1_create_tables() @@ -300,20 +344,25 @@ class ZooMarkTest(TestBase): self.test_baseline_2_insert() # this number... - @profiling.function_call_count(6783, {'2.6':7194, '2.7':7298, - '2.7+cextension':7288, - '2.6+cextension':7184}) + + @profiling.function_call_count(6783, { + '2.6': 7194, + '2.7': 7298, + '2.7+cextension': 7288, + '2.6+cextension': 7184, + }) def test_profile_3_properties(self): self.test_baseline_3_properties() # and this number go down slightly when using the C extensions - @profiling.function_call_count(22510, {'2.6':24055, '2.7':24214}) + + @profiling.function_call_count(22510, {'2.6': 24055, '2.7': 24214}) def test_profile_4_expressions(self): self.test_baseline_4_expressions() - @profiling.function_call_count(1313, {'2.6+cextension':1236, - '2.7+cextension':1207}, - variance=0.1) + @profiling.function_call_count(1313, {'2.6+cextension': 1236, + '2.7+cextension': 1207}, + variance=0.1) def test_profile_5_aggregates(self): self.test_baseline_5_aggregates() @@ -323,5 +372,3 @@ class ZooMarkTest(TestBase): def test_profile_7_drop(self): self.test_baseline_7_drop() - - diff --git a/test/engine/test_bind.py b/test/engine/test_bind.py index c958f6f4c5..547afc64cf 100644 --- a/test/engine/test_bind.py +++ b/test/engine/test_bind.py @@ -33,24 +33,18 @@ class BindTest(testing.TestBase): def test_create_drop_err_metadata(self): metadata = MetaData() - table = Table('test_table', metadata, - Column('foo', Integer)) - - for meth in [ - metadata.create_all, - metadata.drop_all, - ]: + table = Table('test_table', metadata, Column('foo', Integer)) + for meth in [metadata.create_all, metadata.drop_all]: try: meth() assert False except exc.UnboundExecutionError, e: - eq_( - str(e), - "The MetaData " - "is not bound to an Engine or Connection. " - "Execution can not proceed without a database to execute " - "against. Either execute with an explicit connection or " - "assign the MetaData's .bind to enable implicit execution.") + eq_(str(e), + "The MetaData is not bound to an Engine or " + "Connection. Execution can not proceed without a " + "database to execute against. Either execute with " + "an explicit connection or assign the MetaData's " + ".bind to enable implicit execution.") def test_create_drop_err_table(self): metadata = MetaData() @@ -153,7 +147,8 @@ class BindTest(testing.TestBase): table.insert().execute(foo=7) trans.rollback() metadata.bind = None - assert conn.execute("select count(1) from test_table").scalar() == 0 + assert conn.execute('select count(1) from test_table' + ).scalar() == 0 finally: metadata.drop_all(bind=conn) @@ -167,7 +162,7 @@ class BindTest(testing.TestBase): for elem in [ table.select, lambda **kwargs: sa.func.current_timestamp(**kwargs).select(), -# func.current_timestamp().select, + # func.current_timestamp().select, lambda **kwargs:text("select * from test_table", **kwargs) ]: for bind in ( @@ -188,12 +183,13 @@ class BindTest(testing.TestBase): e.execute() assert False except exc.UnboundExecutionError, e: - assert str(e).endswith( - 'is not bound and does not support direct ' - 'execution. Supply this statement to a Connection or ' - 'Engine for execution. Or, assign a bind to the ' - 'statement or the Metadata of its underlying tables to ' - 'enable implicit execution via this method.') + assert str(e).endswith('is not bound and does not ' + 'support direct execution. Supply this ' + 'statement to a Connection or Engine for ' + 'execution. Or, assign a bind to the ' + 'statement or the Metadata of its ' + 'underlying tables to enable implicit ' + 'execution via this method.') finally: if isinstance(bind, engine.Connection): bind.close() diff --git a/test/engine/test_ddlevents.py b/test/engine/test_ddlevents.py index 06d2935262..ccbbfd82d0 100644 --- a/test/engine/test_ddlevents.py +++ b/test/engine/test_ddlevents.py @@ -1,5 +1,6 @@ from sqlalchemy.test.testing import assert_raises, assert_raises_message -from sqlalchemy.schema import DDL, CheckConstraint, AddConstraint, DropConstraint +from sqlalchemy.schema import DDL, CheckConstraint, AddConstraint, \ + DropConstraint from sqlalchemy import create_engine from sqlalchemy import MetaData, Integer, String from sqlalchemy.test.schema import Table @@ -158,13 +159,12 @@ class DDLEventTest(TestBase): def test_metadata_table_isolation(self): metadata, table, bind = self.metadata, self.table, self.bind - table_canary = self.Canary(table, bind) - table.ddl_listeners['before-create'].append(table_canary.before_create) - + table.ddl_listeners['before-create' + ].append(table_canary.before_create) metadata_canary = self.Canary(metadata, bind) - metadata.ddl_listeners['before-create'].append(metadata_canary.before_create) - + metadata.ddl_listeners['before-create' + ].append(metadata_canary.before_create) self.table.create(self.bind) assert metadata_canary.state == None @@ -235,27 +235,28 @@ class DDLExecutionTest(TestBase): metadata, users, engine = self.metadata, self.users, self.engine nonpg_mock = engines.mock_engine(dialect_name='sqlite') pg_mock = engines.mock_engine(dialect_name='postgresql') - - constraint = CheckConstraint('a < b',name="my_test_constraint", table=users) + constraint = CheckConstraint('a < b', name='my_test_constraint' + , table=users) - # by placing the constraint in an Add/Drop construct, - # the 'inline_ddl' flag is set to False - AddConstraint(constraint, on='postgresql').execute_at("after-create", users) - DropConstraint(constraint, on='postgresql').execute_at("before-drop", users) - + # by placing the constraint in an Add/Drop construct, the + # 'inline_ddl' flag is set to False + + AddConstraint(constraint, on='postgresql' + ).execute_at('after-create', users) + DropConstraint(constraint, on='postgresql' + ).execute_at('before-drop', users) metadata.create_all(bind=nonpg_mock) - strings = " ".join(str(x) for x in nonpg_mock.mock) - assert "my_test_constraint" not in strings + strings = ' '.join(str(x) for x in nonpg_mock.mock) + assert 'my_test_constraint' not in strings metadata.drop_all(bind=nonpg_mock) - strings = " ".join(str(x) for x in nonpg_mock.mock) - assert "my_test_constraint" not in strings - + strings = ' '.join(str(x) for x in nonpg_mock.mock) + assert 'my_test_constraint' not in strings metadata.create_all(bind=pg_mock) - strings = " ".join(str(x) for x in pg_mock.mock) - assert "my_test_constraint" in strings + strings = ' '.join(str(x) for x in pg_mock.mock) + assert 'my_test_constraint' in strings metadata.drop_all(bind=pg_mock) - strings = " ".join(str(x) for x in pg_mock.mock) - assert "my_test_constraint" in strings + strings = ' '.join(str(x) for x in pg_mock.mock) + assert 'my_test_constraint' in strings def test_metadata(self): metadata, engine = self.metadata, self.engine @@ -327,24 +328,32 @@ class DDLTest(TestBase, AssertsCompiledSQL): sane_alone = Table('t', m, Column('id', Integer)) sane_schema = Table('t', m, Column('id', Integer), schema='s') insane_alone = Table('t t', m, Column('id', Integer)) - insane_schema = Table('t t', m, Column('id', Integer), schema='s s') - + insane_schema = Table('t t', m, Column('id', Integer), + schema='s s') ddl = DDL('%(schema)s-%(table)s-%(fullname)s') - dialect = self.mock_engine().dialect - self.assert_compile(ddl.against(sane_alone), '-t-t', dialect=dialect) - self.assert_compile(ddl.against(sane_schema), 's-t-s.t', dialect=dialect) - self.assert_compile(ddl.against(insane_alone), '-"t t"-"t t"', dialect=dialect) - self.assert_compile(ddl.against(insane_schema), '"s s"-"t t"-"s s"."t t"', dialect=dialect) + self.assert_compile(ddl.against(sane_alone), '-t-t', + dialect=dialect) + self.assert_compile(ddl.against(sane_schema), 's-t-s.t', + dialect=dialect) + self.assert_compile(ddl.against(insane_alone), '-"t t"-"t t"', + dialect=dialect) + self.assert_compile(ddl.against(insane_schema), + '"s s"-"t t"-"s s"."t t"', dialect=dialect) # overrides are used piece-meal and verbatim. - ddl = DDL('%(schema)s-%(table)s-%(fullname)s-%(bonus)s', - context={'schema':'S S', 'table': 'T T', 'bonus': 'b'}) - self.assert_compile(ddl.against(sane_alone), 'S S-T T-t-b', dialect=dialect) - self.assert_compile(ddl.against(sane_schema), 'S S-T T-s.t-b', dialect=dialect) - self.assert_compile(ddl.against(insane_alone), 'S S-T T-"t t"-b', dialect=dialect) - self.assert_compile(ddl.against(insane_schema), 'S S-T T-"s s"."t t"-b', dialect=dialect) + ddl = DDL('%(schema)s-%(table)s-%(fullname)s-%(bonus)s', + context={'schema': 'S S', 'table': 'T T', 'bonus': 'b' + }) + self.assert_compile(ddl.against(sane_alone), 'S S-T T-t-b', + dialect=dialect) + self.assert_compile(ddl.against(sane_schema), 'S S-T T-s.t-b', + dialect=dialect) + self.assert_compile(ddl.against(insane_alone), 'S S-T T-"t t"-b' + , dialect=dialect) + self.assert_compile(ddl.against(insane_schema), + 'S S-T T-"s s"."t t"-b', dialect=dialect) def test_filter(self): cx = self.mock_engine() diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 6b2ba2010c..47879ece9e 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -1,11 +1,13 @@ from sqlalchemy.test.testing import eq_, assert_raises import re from sqlalchemy.interfaces import ConnectionProxy -from sqlalchemy import MetaData, Integer, String, INT, VARCHAR, func, bindparam, select +from sqlalchemy import MetaData, Integer, String, INT, VARCHAR, func, \ + bindparam, select from sqlalchemy.test.schema import Table, Column import sqlalchemy as tsa from sqlalchemy.test import TestBase, testing, engines import logging +from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam users, metadata = None, None class ExecuteTest(TestBase): @@ -18,7 +20,8 @@ class ExecuteTest(TestBase): Column('user_name', VARCHAR(20)), ) users_autoinc = Table('users_autoinc', metadata, - Column('user_id', INT, primary_key = True, test_needs_autoincrement=True), + Column('user_id', INT, primary_key = True, + test_needs_autoincrement=True), Column('user_name', VARCHAR(20)), ) metadata.create_all() @@ -35,94 +38,116 @@ class ExecuteTest(TestBase): 'sqlite', '+pyodbc', '+mxodbc', '+zxjdbc', 'mysql+oursql') def test_raw_qmark(self): - for conn in (testing.db, testing.db.connect()): - conn.execute("insert into users (user_id, user_name) values (?, ?)", (1,"jack")) - conn.execute("insert into users (user_id, user_name) values (?, ?)", [2,"fred"]) - conn.execute("insert into users (user_id, user_name) values (?, ?)", - [3,"ed"], - [4,"horse"]) - conn.execute("insert into users (user_id, user_name) values (?, ?)", - (5,"barney"), (6,"donkey")) - conn.execute("insert into users (user_id, user_name) values (?, ?)", 7, 'sally') - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [(1, "jack"), (2, "fred"), - (3, "ed"), (4, "horse"), - (5, "barney"), (6, "donkey"), - (7, 'sally')] - conn.execute("delete from users") + for conn in testing.db, testing.db.connect(): + conn.execute('insert into users (user_id, user_name) ' + 'values (?, ?)', (1, 'jack')) + conn.execute('insert into users (user_id, user_name) ' + 'values (?, ?)', [2, 'fred']) + conn.execute('insert into users (user_id, user_name) ' + 'values (?, ?)', [3, 'ed'], [4, 'horse']) + conn.execute('insert into users (user_id, user_name) ' + 'values (?, ?)', (5, 'barney'), (6, 'donkey')) + conn.execute('insert into users (user_id, user_name) ' + 'values (?, ?)', 7, 'sally') + res = conn.execute('select * from users order by user_id') + assert res.fetchall() == [ + (1, 'jack'), + (2, 'fred'), + (3, 'ed'), + (4, 'horse'), + (5, 'barney'), + (6, 'donkey'), + (7, 'sally'), + ] + conn.execute('delete from users') - @testing.fails_on_everything_except('mysql+mysqldb', 'mysql+mysqlconnector', 'postgresql') - @testing.fails_on('postgresql+zxjdbc', 'sprintf not supported') # some psycopg2 versions bomb this. + @testing.fails_on_everything_except('mysql+mysqldb', + 'mysql+mysqlconnector', 'postgresql') + @testing.fails_on('postgresql+zxjdbc', 'sprintf not supported') def test_raw_sprintf(self): - for conn in (testing.db, testing.db.connect()): - conn.execute("insert into users (user_id, user_name) values (%s, %s)", [1,"jack"]) - conn.execute("insert into users (user_id, user_name) values (%s, %s)", - [2,"ed"], - [3,"horse"]) - conn.execute("insert into users (user_id, user_name) values (%s, %s)", 4, 'sally') - conn.execute("insert into users (user_id) values (%s)", 5) - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [(1, "jack"), (2, "ed"), - (3, "horse"), (4, 'sally'), - (5, None)] - conn.execute("delete from users") + for conn in testing.db, testing.db.connect(): + conn.execute('insert into users (user_id, user_name) ' + 'values (%s, %s)', [1, 'jack']) + conn.execute('insert into users (user_id, user_name) ' + 'values (%s, %s)', [2, 'ed'], [3, 'horse']) + conn.execute('insert into users (user_id, user_name) ' + 'values (%s, %s)', 4, 'sally') + conn.execute('insert into users (user_id) values (%s)', 5) + res = conn.execute('select * from users order by user_id') + assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3, + 'horse'), (4, 'sally'), (5, None)] + conn.execute('delete from users') # pyformat is supported for mysql, but skipping because a few driver - # versions have a bug that bombs out on this test. (1.2.2b3, 1.2.2c1, 1.2.2) - @testing.skip_if(lambda: testing.against('mysql+mysqldb'), 'db-api flaky') - @testing.fails_on_everything_except('postgresql+psycopg2', - 'postgresql+pypostgresql', 'mysql+mysqlconnector') + # versions have a bug that bombs out on this test. (1.2.2b3, + # 1.2.2c1, 1.2.2) + + @testing.skip_if(lambda : testing.against('mysql+mysqldb'), + 'db-api flaky') + @testing.fails_on_everything_except('postgresql+psycopg2', + 'postgresql+pypostgresql', 'mysql+mysqlconnector') def test_raw_python(self): - for conn in (testing.db, testing.db.connect()): - conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", - {'id':1, 'name':'jack'}) - conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", - {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'}) - conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", - id=4, name='sally') - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')] - conn.execute("delete from users") + for conn in testing.db, testing.db.connect(): + conn.execute('insert into users (user_id, user_name) ' + 'values (%(id)s, %(name)s)', {'id': 1, 'name' + : 'jack'}) + conn.execute('insert into users (user_id, user_name) ' + 'values (%(id)s, %(name)s)', {'id': 2, 'name' + : 'ed'}, {'id': 3, 'name': 'horse'}) + conn.execute('insert into users (user_id, user_name) ' + 'values (%(id)s, %(name)s)', id=4, name='sally' + ) + res = conn.execute('select * from users order by user_id') + assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3, + 'horse'), (4, 'sally')] + conn.execute('delete from users') @testing.fails_on_everything_except('sqlite', 'oracle+cx_oracle') def test_raw_named(self): - for conn in (testing.db, testing.db.connect()): - conn.execute("insert into users (user_id, user_name) values (:id, :name)", - {'id':1, 'name':'jack'}) - conn.execute("insert into users (user_id, user_name) values (:id, :name)", - {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'}) - conn.execute("insert into users (user_id, user_name) values (:id, :name)", - id=4, name='sally') - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')] - conn.execute("delete from users") + for conn in testing.db, testing.db.connect(): + conn.execute('insert into users (user_id, user_name) ' + 'values (:id, :name)', {'id': 1, 'name': 'jack' + }) + conn.execute('insert into users (user_id, user_name) ' + 'values (:id, :name)', {'id': 2, 'name': 'ed' + }, {'id': 3, 'name': 'horse'}) + conn.execute('insert into users (user_id, user_name) ' + 'values (:id, :name)', id=4, name='sally') + res = conn.execute('select * from users order by user_id') + assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3, + 'horse'), (4, 'sally')] + conn.execute('delete from users') def test_exception_wrapping(self): - for conn in (testing.db, testing.db.connect()): + for conn in testing.db, testing.db.connect(): try: - conn.execute("osdjafioajwoejoasfjdoifjowejfoawejqoijwef") + conn.execute('osdjafioajwoejoasfjdoifjowejfoawejqoijwef' + ) assert False except tsa.exc.DBAPIError: assert True def test_empty_insert(self): """test that execute() interprets [] as a list with no params""" - result = testing.db.execute(users_autoinc.insert().values(user_name=bindparam('name')), []) - eq_(testing.db.execute(users_autoinc.select()).fetchall(), [ - (1, None) - ]) - + + result = \ + testing.db.execute(users_autoinc.insert(). + values(user_name=bindparam('name')), []) + eq_(testing.db.execute(users_autoinc.select()).fetchall(), [(1, + None)]) + def test_engine_level_options(self): - eng = engines.testing_engine(options={ - 'execution_options':{'foo':'bar'} - }) + eng = engines.testing_engine(options={'execution_options' + : {'foo': 'bar'}}) conn = eng.contextual_connect() eq_(conn._execution_options['foo'], 'bar') - eq_(conn.execution_options(bat='hoho')._execution_options['foo'], 'bar') - eq_(conn.execution_options(bat='hoho')._execution_options['bat'], 'hoho') - eq_(conn.execution_options(foo='hoho')._execution_options['foo'], 'hoho') - + eq_(conn.execution_options(bat='hoho')._execution_options['foo' + ], 'bar') + eq_(conn.execution_options(bat='hoho')._execution_options['bat' + ], 'hoho') + eq_(conn.execution_options(foo='hoho')._execution_options['foo' + ], 'hoho') eng.update_execution_options(foo='hoho') conn = eng.contextual_connect() eq_(conn._execution_options['foo'], 'hoho') @@ -134,7 +159,8 @@ class CompiledCacheTest(TestBase): global users, metadata metadata = MetaData(testing.db) users = Table('users', metadata, - Column('user_id', INT, primary_key=True, test_needs_autoincrement=True), + Column('user_id', INT, primary_key=True, + test_needs_autoincrement=True), Column('user_name', VARCHAR(20)), ) metadata.create_all() @@ -177,7 +203,8 @@ class LogTest(TestBase): names = set([b.name for b in buf.buffer]) assert 'sqlalchemy.engine.base.Engine.%s' % (eng_name,) in names - assert 'sqlalchemy.pool.%s.%s' % (eng.pool.__class__.__name__, pool_name) in names + assert 'sqlalchemy.pool.%s.%s' % (eng.pool.__class__.__name__, + pool_name) in names def test_named_logger(self): options = {'echo':'debug', 'echo_pool':'debug', @@ -192,7 +219,8 @@ class LogTest(TestBase): def test_unnamed_logger(self): - eng = engines.testing_engine(options={'echo':'debug', 'echo_pool':'debug'}) + eng = engines.testing_engine(options={'echo': 'debug', + 'echo_pool': 'debug'}) self._test_logger( eng, "0x...%s" % hex(id(eng))[-4:], @@ -216,7 +244,8 @@ class ResultProxyTest(TestBase): def __getitem__(self, i): return list.__getitem__(self.l, i) - proxy = RowProxy(object(), MyList(['value']), [None], {'key': (None, 0), 0: (None, 0)}) + proxy = RowProxy(object(), MyList(['value']), [None], {'key' + : (None, 0), 0: (None, 0)}) eq_(list(proxy), ['value']) eq_(proxy[0], 'value') eq_(proxy['key'], 'value') @@ -250,12 +279,12 @@ class ResultProxyTest(TestBase): {}) try: - r = t.insert().execute({'data':'d1'}, {'data':'d2'}, {'data': 'd3'}) - eq_( - t.select().execute().fetchall(), - [('d1', ), ('d2',), ('d3', )] - ) - assert_raises(AssertionError, t.update().execute, {'data':'d4'}) + r = t.insert().execute({'data': 'd1'}, {'data': 'd2'}, + {'data': 'd3'}) + eq_(t.select().execute().fetchall(), [('d1', ), ('d2', ), + ('d3', )]) + assert_raises(AssertionError, t.update().execute, {'data' + : 'd4'}) assert_raises(AssertionError, t.delete().execute) finally: engine.dialect.execution_ctx_cls = execution_ctx_cls @@ -269,16 +298,27 @@ class ProxyConnectionTest(TestBase): cursor_stmts = [] class MyProxy(ConnectionProxy): - def execute(self, conn, execute, clauseelement, *multiparams, **params): - stmts.append( - (str(clauseelement), params,multiparams) - ) + def execute( + self, + conn, + execute, + clauseelement, + *multiparams, + **params + ): + stmts.append((str(clauseelement), params, multiparams)) return execute(clauseelement, *multiparams, **params) - def cursor_execute(self, execute, cursor, statement, parameters, context, executemany): - cursor_stmts.append( - (str(statement), parameters, None) - ) + def cursor_execute( + self, + execute, + cursor, + statement, + parameters, + context, + executemany, + ): + cursor_stmts.append((str(statement), parameters, None)) return execute(cursor, statement, parameters, context) def assert_stmts(expected, received): @@ -286,68 +326,65 @@ class ProxyConnectionTest(TestBase): if not received: assert False while received: - teststmt, testparams, testmultiparams = received.pop(0) - teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ', teststmt).strip() - if teststmt.startswith(stmt) and (testparams==params or testparams==posn): + teststmt, testparams, testmultiparams = \ + received.pop(0) + teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ', + teststmt).strip() + if teststmt.startswith(stmt) and (testparams + == params or testparams == posn): break - for engine in ( - engines.testing_engine(options=dict(implicit_returning=False, proxy=MyProxy())), - engines.testing_engine(options=dict( - implicit_returning=False, - proxy=MyProxy(), - strategy='threadlocal')) - ): + for engine in \ + engines.testing_engine(options=dict(implicit_returning=False, + proxy=MyProxy())), \ + engines.testing_engine(options=dict(implicit_returning=False, + proxy=MyProxy(), + strategy='threadlocal')): m = MetaData(engine) - t1 = Table('t1', m, - Column('c1', Integer, primary_key=True), - Column('c2', String(50), default=func.lower('Foo'), primary_key=True) + Column('c1', Integer, primary_key=True), + Column('c2', String(50), default=func.lower('Foo'), + primary_key=True) ) - m.create_all() try: t1.insert().execute(c1=5, c2='some data') t1.insert().execute(c1=6) - eq_(engine.execute("select * from t1").fetchall(), - [(5, 'some data'), (6, 'foo')] - ) + eq_(engine.execute('select * from t1').fetchall(), [(5, + 'some data'), (6, 'foo')]) finally: m.drop_all() - engine.dispose() - - compiled = [ - ("CREATE TABLE t1", {}, None), - ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, None), - ("INSERT INTO t1 (c1, c2)", {'c1': 6}, None), - ("select * from t1", {}, None), - ("DROP TABLE t1", {}, None) - ] - - if not testing.against('oracle+zxjdbc'): # or engine.dialect.preexecute_pk_sequences: + compiled = [('CREATE TABLE t1', {}, None), + ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', + 'c1': 5}, None), ('INSERT INTO t1 (c1, c2)', + {'c1': 6}, None), ('select * from t1', {}, + None), ('DROP TABLE t1', {}, None)] + if not testing.against('oracle+zxjdbc'): # or engine.dialect.pr + # eexecute_pk_sequence + # s: cursor = [ - ("CREATE TABLE t1", {}, ()), - ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, (5, 'some data')), - ("SELECT lower", {'lower_2':'Foo'}, ('Foo',)), - ("INSERT INTO t1 (c1, c2)", {'c2': 'foo', 'c1': 6}, (6, 'foo')), - ("select * from t1", {}, ()), - ("DROP TABLE t1", {}, ()) - ] + ('CREATE TABLE t1', {}, ()), + ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', 'c1' + : 5}, (5, 'some data')), + ('SELECT lower', {'lower_2': 'Foo'}, ('Foo', )), + ('INSERT INTO t1 (c1, c2)', {'c2': 'foo', 'c1': 6}, + (6, 'foo')), + ('select * from t1', {}, ()), + ('DROP TABLE t1', {}, ()), + ] else: - insert2_params = (6, 'Foo') + insert2_params = 6, 'Foo' if testing.against('oracle+zxjdbc'): - from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam - insert2_params += (ReturningParam(12),) - cursor = [ - ("CREATE TABLE t1", {}, ()), - ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, (5, 'some data')), - # bind param name 'lower_2' might be incorrect - ("INSERT INTO t1 (c1, c2)", {'c1': 6, "lower_2":"Foo"}, insert2_params), - ("select * from t1", {}, ()), - ("DROP TABLE t1", {}, ()) - ] - + insert2_params += (ReturningParam(12), ) + cursor = [('CREATE TABLE t1', {}, ()), + ('INSERT INTO t1 (c1, c2)', {'c2': 'some data' + , 'c1': 5}, (5, 'some data')), + ('INSERT INTO t1 (c1, c2)', {'c1': 6, + 'lower_2': 'Foo'}, insert2_params), + ('select * from t1', {}, ()), ('DROP TABLE t1' + , {}, ())] # bind param name 'lower_2' might + # be incorrect assert_stmts(compiled, stmts) assert_stmts(cursor, cursor_stmts) @@ -389,8 +426,16 @@ class ProxyConnectionTest(TestBase): conn.execute(select([1])) trans.commit() - eq_(track, ['begin', 'execute', 'cursor_execute', - 'rollback', 'begin', 'execute', 'cursor_execute', 'commit']) + eq_(track, [ + 'begin', + 'execute', + 'cursor_execute', + 'rollback', + 'begin', + 'execute', + 'cursor_execute', + 'commit', + ]) @testing.requires.savepoints @testing.requires.two_phase_transactions diff --git a/test/engine/test_metadata.py b/test/engine/test_metadata.py index 4c7eee97b5..7ea7536219 100644 --- a/test/engine/test_metadata.py +++ b/test/engine/test_metadata.py @@ -1,18 +1,20 @@ from sqlalchemy.test.testing import assert_raises, assert_raises_message import pickle -from sqlalchemy import Integer, String, UniqueConstraint, CheckConstraint,\ - ForeignKey, MetaData, Sequence, ForeignKeyConstraint,\ - ColumnDefault +from sqlalchemy import Integer, String, UniqueConstraint, \ + CheckConstraint, ForeignKey, MetaData, Sequence, \ + ForeignKeyConstraint, ColumnDefault from sqlalchemy.test.schema import Table, Column from sqlalchemy import schema, exc import sqlalchemy as tsa -from sqlalchemy.test import TestBase, ComparesTables, AssertsCompiledSQL, testing, engines +from sqlalchemy.test import TestBase, ComparesTables, \ + AssertsCompiledSQL, testing, engines from sqlalchemy.test.testing import eq_ class MetaDataTest(TestBase, ComparesTables): def test_metadata_connect(self): metadata = MetaData() - t1 = Table('table1', metadata, Column('col1', Integer, primary_key=True), + t1 = Table('table1', metadata, + Column('col1', Integer, primary_key=True), Column('col2', String(20))) metadata.bind = testing.db metadata.create_all() @@ -42,12 +44,15 @@ class MetaDataTest(TestBase, ComparesTables): Column('foo', String(), nullable=False), Column('baz', String(), unique=True), Column(Integer(), primary_key=True), - Column('bar', Integer(), Sequence('foo_seq'), primary_key=True, key='bar'), + Column('bar', Integer(), Sequence('foo_seq'), primary_key=True, + key='bar'), Column(Integer(), ForeignKey('bat.blah')), - Column('bar', Integer(), ForeignKey('bat.blah'), primary_key=True, key='bar'), + Column('bar', Integer(), ForeignKey('bat.blah'), primary_key=True, + key='bar'), ]: c2 = col.copy() - for attr in ('name', 'type', 'nullable', 'primary_key', 'key', 'unique'): + for attr in ('name', 'type', 'nullable', + 'primary_key', 'key', 'unique'): eq_(getattr(col, attr), getattr(c2, attr)) eq_(len(col.foreign_keys), len(c2.foreign_keys)) if col.default: @@ -71,7 +76,8 @@ class MetaDataTest(TestBase, ComparesTables): def test_dupe_tables(self): metadata = MetaData() - t1 = Table('table1', metadata, Column('col1', Integer, primary_key=True), + t1 = Table('table1', metadata, + Column('col1', Integer, primary_key=True), Column('col2', String(20))) metadata.bind = testing.db @@ -79,11 +85,16 @@ class MetaDataTest(TestBase, ComparesTables): try: try: t1 = Table('table1', metadata, autoload=True) - t2 = Table('table1', metadata, Column('col1', Integer, primary_key=True), + t2 = Table('table1', metadata, + Column('col1', Integer, primary_key=True), Column('col2', String(20))) assert False except tsa.exc.InvalidRequestError, e: - assert str(e) == "Table 'table1' is already defined for this MetaData instance. Specify 'useexisting=True' to redefine options and columns on an existing Table object." + assert str(e) \ + == "Table 'table1' is already defined for this "\ + "MetaData instance. Specify 'useexisting=True' "\ + "to redefine options and columns on an existing "\ + "Table object." finally: metadata.drop_all() @@ -123,9 +134,12 @@ class MetaDataTest(TestBase, ComparesTables): table = Table('mytable', meta, Column('myid', Integer, Sequence('foo_id_seq'), primary_key=True), Column('name', String(40), nullable=True), - Column('foo', String(40), nullable=False, server_default='x', server_onupdate='q'), - Column('bar', String(40), nullable=False, default='y', onupdate='z'), - Column('description', String(30), CheckConstraint("description='hi'")), + Column('foo', String(40), nullable=False, server_default='x', + server_onupdate='q'), + Column('bar', String(40), nullable=False, default='y', + onupdate='z'), + Column('description', String(30), + CheckConstraint("description='hi'")), UniqueConstraint('name'), test_needs_fk=True, ) @@ -164,33 +178,37 @@ class MetaDataTest(TestBase, ComparesTables): meta.create_all(testing.db) try: - for test, has_constraints, reflect in ((test_to_metadata, True, False), (test_pickle, True, False),(test_pickle_via_reflect, False, True)): + for test, has_constraints, reflect in (test_to_metadata, + True, False), (test_pickle, True, False), \ + (test_pickle_via_reflect, False, True): table_c, table2_c = test() self.assert_tables_equal(table, table_c) self.assert_tables_equal(table2, table2_c) - assert table is not table_c assert table.primary_key is not table_c.primary_key - assert list(table2_c.c.myid.foreign_keys)[0].column is table_c.c.myid - assert list(table2_c.c.myid.foreign_keys)[0].column is not table.c.myid + assert list(table2_c.c.myid.foreign_keys)[0].column \ + is table_c.c.myid + assert list(table2_c.c.myid.foreign_keys)[0].column \ + is not table.c.myid assert 'x' in str(table_c.c.foo.server_default.arg) - if not reflect: assert isinstance(table_c.c.myid.default, Sequence) assert str(table_c.c.foo.server_onupdate.arg) == 'q' assert str(table_c.c.bar.default.arg) == 'y' - assert getattr(table_c.c.bar.onupdate.arg, 'arg', table_c.c.bar.onupdate.arg) == 'z' + assert getattr(table_c.c.bar.onupdate.arg, 'arg', + table_c.c.bar.onupdate.arg) == 'z' assert isinstance(table2_c.c.id.default, Sequence) - - # constraints dont get reflected for any dialect right now + + # constraints dont get reflected for any dialect right + # now + if has_constraints: for c in table_c.c.description.constraints: if isinstance(c, CheckConstraint): break else: assert False - assert str(c.sqltext)=="description='hi'" - + assert str(c.sqltext) == "description='hi'" for c in table_c.constraints: if isinstance(c, UniqueConstraint): break @@ -207,7 +225,8 @@ class MetaDataTest(TestBase, ComparesTables): table = Table('mytable', meta, Column('myid', Integer, primary_key=True), Column('name', String(40), nullable=True), - Column('description', String(30), CheckConstraint("description='hi'")), + Column('description', String(30), + CheckConstraint("description='hi'")), UniqueConstraint('name'), test_needs_fk=True, ) @@ -222,8 +241,10 @@ class MetaDataTest(TestBase, ComparesTables): table_c = table.tometadata(meta2, schema='someschema') table2_c = table2.tometadata(meta2, schema='someschema') - eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid == table2_c.c.myid)) - eq_(str(table_c.join(table2_c).onclause), "someschema.mytable.myid = someschema.othertable.myid") + eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid + == table2_c.c.myid)) + eq_(str(table_c.join(table2_c).onclause), + 'someschema.mytable.myid = someschema.othertable.myid') def test_tometadata_default_schema(self): meta = MetaData() @@ -231,7 +252,8 @@ class MetaDataTest(TestBase, ComparesTables): table = Table('mytable', meta, Column('myid', Integer, primary_key=True), Column('name', String(40), nullable=True), - Column('description', String(30), CheckConstraint("description='hi'")), + Column('description', String(30), + CheckConstraint("description='hi'")), UniqueConstraint('name'), test_needs_fk=True, schema='myschema', @@ -248,8 +270,10 @@ class MetaDataTest(TestBase, ComparesTables): table_c = table.tometadata(meta2) table2_c = table2.tometadata(meta2) - eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid == table2_c.c.myid)) - eq_(str(table_c.join(table2_c).onclause), "myschema.mytable.myid = myschema.othertable.myid") + eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid + == table2_c.c.myid)) + eq_(str(table_c.join(table2_c).onclause), + 'myschema.mytable.myid = myschema.othertable.myid') def test_manual_dependencies(self): meta = MetaData() @@ -276,7 +300,8 @@ class MetaDataTest(TestBase, ComparesTables): table = Table('mytable', meta, Column('myid', Integer, primary_key=True), Column('name', String(40), nullable=True), - Column('description', String(30), CheckConstraint("description='hi'")), + Column('description', String(30), + CheckConstraint("description='hi'")), UniqueConstraint('name'), test_needs_fk=True, ) @@ -291,8 +316,10 @@ class MetaDataTest(TestBase, ComparesTables): table_c = table.tometadata(meta2, schema=None) table2_c = table2.tometadata(meta2, schema=None) - eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid == table2_c.c.myid)) - eq_(str(table_c.join(table2_c).onclause), "mytable.myid = othertable.myid") + eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid + == table2_c.c.myid)) + eq_(str(table_c.join(table2_c).onclause), + 'mytable.myid = othertable.myid') def test_nonexistent(self): assert_raises(tsa.exc.NoSuchTableError, Table, diff --git a/test/engine/test_parseconnect.py b/test/engine/test_parseconnect.py index c00a831368..7fb8d8a907 100644 --- a/test/engine/test_parseconnect.py +++ b/test/engine/test_parseconnect.py @@ -26,46 +26,56 @@ class ParseConnectTest(TestBase): 'dbtype:///E:/work/src/LEM/db/hello.db?foo=bar&hoho=lala', 'dbtype://', 'dbtype://username:password@/db', - 'dbtype:////usr/local/mailman/lists/_xtest@example.com/members.db', + 'dbtype:////usr/local/mailman/lists/_xtest@example.com/memb' + 'ers.db', 'dbtype://username:apples%2Foranges@hostspec/mydatabase', - ): + ): u = url.make_url(text) assert u.drivername == 'dbtype' assert u.username == 'username' or u.username is None - assert u.password == 'password' or u.password == 'apples/oranges' or u.password is None - assert u.host == 'hostspec' or u.host == '127.0.0.1' or (not u.host) + assert u.password == 'password' or u.password \ + == 'apples/oranges' or u.password is None + assert u.host == 'hostspec' or u.host == '127.0.0.1' \ + or not u.host assert str(u) == text class DialectImportTest(TestBase): def test_import_base_dialects(self): + # the globals() somehow makes it for the exec() + nose3. - for name in ('mysql', 'firebird', 'postgresql', 'sqlite', 'oracle', 'mssql'): - exec("from sqlalchemy.dialects import %s\n" - "dialect = %s.dialect()" - % (name, name), globals()) + + for name in ( + 'mysql', + 'firebird', + 'postgresql', + 'sqlite', + 'oracle', + 'mssql', + ): + exec ('from sqlalchemy.dialects import %s\ndialect = ' + '%s.dialect()' % (name, name), globals()) eq_(dialect.name, name) class CreateEngineTest(TestBase): - """test that create_engine arguments of different types get propagated properly""" + """test that create_engine arguments of different types get + propagated properly""" + def test_connect_query(self): dbapi = MockDBAPI(foober='12', lala='18', fooz='somevalue') - - e = create_engine( - 'postgresql://scott:tiger@somehost/test?foober=12&lala=18&fooz=somevalue', - module=dbapi, - _initialize=False - ) + e = \ + create_engine('postgresql://scott:tiger@somehost/test?foobe' + 'r=12&lala=18&fooz=somevalue', module=dbapi, + _initialize=False) c = e.connect() def test_kwargs(self): - dbapi = MockDBAPI(foober=12, lala=18, hoho={'this':'dict'}, fooz='somevalue') - - e = create_engine( - 'postgresql://scott:tiger@somehost/test?fooz=somevalue', - connect_args={'foober':12, 'lala':18, 'hoho':{'this':'dict'}}, - module=dbapi, - _initialize=False - ) + dbapi = MockDBAPI(foober=12, lala=18, hoho={'this': 'dict'}, + fooz='somevalue') + e = \ + create_engine('postgresql://scott:tiger@somehost/test?fooz=' + 'somevalue', connect_args={'foober': 12, + 'lala': 18, 'hoho': {'this': 'dict'}}, + module=dbapi, _initialize=False) c = e.connect() def test_coerce_config(self): @@ -107,7 +117,8 @@ pool_timeout=10 } prefixed = dict(ini.items('prefixed')) - self.assert_(tsa.engine._coerce_config(prefixed, 'sqlalchemy.') == expected) + self.assert_(tsa.engine._coerce_config(prefixed, 'sqlalchemy.') + == expected) plain = dict(ini.items('plain')) self.assert_(tsa.engine._coerce_config(plain, '') == expected) @@ -115,53 +126,73 @@ pool_timeout=10 def test_engine_from_config(self): dbapi = mock_dbapi - config = { - 'sqlalchemy.url':'postgresql://scott:tiger@somehost/test?fooz=somevalue', - 'sqlalchemy.pool_recycle':'50', - 'sqlalchemy.echo':'true' - } + config = \ + {'sqlalchemy.url': 'postgresql://scott:tiger@somehost/test'\ + '?fooz=somevalue', 'sqlalchemy.pool_recycle': '50', + 'sqlalchemy.echo': 'true'} e = engine_from_config(config, module=dbapi, _initialize=False) assert e.pool._recycle == 50 - assert e.url == url.make_url('postgresql://scott:tiger@somehost/test?fooz=somevalue') + assert e.url \ + == url.make_url('postgresql://scott:tiger@somehost/test?foo' + 'z=somevalue') assert e.echo is True def test_custom(self): - dbapi = MockDBAPI(foober=12, lala=18, hoho={'this':'dict'}, fooz='somevalue') + dbapi = MockDBAPI(foober=12, lala=18, hoho={'this': 'dict'}, + fooz='somevalue') def connect(): - return dbapi.connect(foober=12, lala=18, fooz='somevalue', hoho={'this':'dict'}) + return dbapi.connect(foober=12, lala=18, fooz='somevalue', + hoho={'this': 'dict'}) - # start the postgresql dialect, but put our mock DBAPI as the module instead of psycopg - e = create_engine('postgresql://', creator=connect, module=dbapi, _initialize=False) + # start the postgresql dialect, but put our mock DBAPI as the + # module instead of psycopg + + e = create_engine('postgresql://', creator=connect, + module=dbapi, _initialize=False) c = e.connect() def test_recycle(self): - dbapi = MockDBAPI(foober=12, lala=18, hoho={'this':'dict'}, fooz='somevalue') - e = create_engine('postgresql://', pool_recycle=472, module=dbapi, _initialize=False) + dbapi = MockDBAPI(foober=12, lala=18, hoho={'this': 'dict'}, + fooz='somevalue') + e = create_engine('postgresql://', pool_recycle=472, + module=dbapi, _initialize=False) assert e.pool._recycle == 472 def test_badargs(self): - assert_raises(ImportError, create_engine, "foobar://", module=mock_dbapi) - - # bad arg - assert_raises(TypeError, create_engine, 'postgresql://', use_ansi=True, module=mock_dbapi) + assert_raises(ImportError, create_engine, 'foobar://', + module=mock_dbapi) # bad arg - assert_raises(TypeError, create_engine, 'oracle://', lala=5, use_ansi=True, module=mock_dbapi) - assert_raises(TypeError, create_engine, 'postgresql://', lala=5, module=mock_dbapi) + assert_raises(TypeError, create_engine, 'postgresql://', + use_ansi=True, module=mock_dbapi) - assert_raises(TypeError, create_engine,'sqlite://', lala=5, module=mock_sqlite_dbapi) + # bad arg - assert_raises(TypeError, create_engine, 'mysql+mysqldb://', use_unicode=True, module=mock_dbapi) + assert_raises( + TypeError, + create_engine, + 'oracle://', + lala=5, + use_ansi=True, + module=mock_dbapi, + ) + assert_raises(TypeError, create_engine, 'postgresql://', + lala=5, module=mock_dbapi) + assert_raises(TypeError, create_engine, 'sqlite://', lala=5, + module=mock_sqlite_dbapi) + assert_raises(TypeError, create_engine, 'mysql+mysqldb://', + use_unicode=True, module=mock_dbapi) # sqlite uses SingletonThreadPool which doesnt have max_overflow - assert_raises(TypeError, create_engine, 'sqlite://', max_overflow=5, - module=mock_sqlite_dbapi) + assert_raises(TypeError, create_engine, 'sqlite://', + max_overflow=5, module=mock_sqlite_dbapi) try: - e = create_engine('sqlite://', connect_args={'use_unicode':True}, convert_unicode=True) + e = create_engine('sqlite://', connect_args={'use_unicode' + : True}, convert_unicode=True) except ImportError: # no sqlite pass @@ -172,7 +203,8 @@ pool_timeout=10 def test_urlattr(self): """test the url attribute on ``Engine``.""" - e = create_engine('mysql://scott:tiger@localhost/test', module=mock_dbapi, _initialize=False) + e = create_engine('mysql://scott:tiger@localhost/test', + module=mock_dbapi, _initialize=False) u = url.make_url('mysql://scott:tiger@localhost/test') e2 = create_engine(u, module=mock_dbapi, _initialize=False) assert e.url.drivername == e2.url.drivername == 'mysql' @@ -181,40 +213,70 @@ pool_timeout=10 def test_poolargs(self): """test that connection pool args make it thru""" - e = create_engine('postgresql://', creator=None, pool_recycle=50, echo_pool=None, module=mock_dbapi, _initialize=False) + + e = create_engine( + 'postgresql://', + creator=None, + pool_recycle=50, + echo_pool=None, + module=mock_dbapi, + _initialize=False, + ) assert e.pool._recycle == 50 # these args work for QueuePool - e = create_engine('postgresql://', - max_overflow=8, pool_timeout=60, - poolclass=tsa.pool.QueuePool, module=mock_dbapi, - _initialize=False) + + e = create_engine( + 'postgresql://', + max_overflow=8, + pool_timeout=60, + poolclass=tsa.pool.QueuePool, + module=mock_dbapi, + _initialize=False, + ) # but not SingletonThreadPool - assert_raises(TypeError, create_engine, 'sqlite://', max_overflow=8, pool_timeout=60, - poolclass=tsa.pool.SingletonThreadPool, module=mock_sqlite_dbapi, - _initialize=False) + + assert_raises( + TypeError, + create_engine, + 'sqlite://', + max_overflow=8, + pool_timeout=60, + poolclass=tsa.pool.SingletonThreadPool, + module=mock_sqlite_dbapi, + _initialize=False, + ) class MockDBAPI(object): def __init__(self, **kwargs): self.kwargs = kwargs self.paramstyle = 'named' + def connect(self, *args, **kwargs): for k in self.kwargs: - assert k in kwargs, "key %s not present in dictionary" % k - assert kwargs[k]==self.kwargs[k], "value %s does not match %s" % (kwargs[k], self.kwargs[k]) + assert k in kwargs, 'key %s not present in dictionary' % k + assert kwargs[k] == self.kwargs[k], \ + 'value %s does not match %s' % (kwargs[k], + self.kwargs[k]) return MockConnection() + + class MockConnection(object): def get_server_info(self): - return "5.0" + return '5.0' + def close(self): pass + def cursor(self): return MockCursor() + class MockCursor(object): def close(self): pass + mock_dbapi = MockDBAPI() mock_sqlite_dbapi = msd = MockDBAPI() -msd.version_info = msd.sqlite_version_info = (99, 9, 9) +msd.version_info = msd.sqlite_version_info = 99, 9, 9 msd.sqlite_version = '99.9.9' diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 6b0b187e6f..9db65d2ab8 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -73,12 +73,13 @@ class PoolTest(PoolTestBase): self.assert_(connection.cursor() is not None) self.assert_(connection is not connection2) - @testing.fails_on('+pyodbc', "pyodbc cursor doesn't implement tuple __eq__") + @testing.fails_on('+pyodbc', + "pyodbc cursor doesn't implement tuple __eq__") def test_cursor_iterable(self): conn = testing.db.raw_connection() cursor = conn.cursor() cursor.execute(str(select([1], bind=testing.db))) - expected = [(1,)] + expected = [(1, )] for row in cursor: eq_(row, expected.pop(0)) @@ -108,10 +109,11 @@ class PoolTest(PoolTestBase): self._do_testthreadlocal(useclose=True) def _do_testthreadlocal(self, useclose=False): - for p in ( - pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True), - pool.SingletonThreadPool(creator = mock_dbapi.connect, use_threadlocal = True) - ): + for p in pool.QueuePool(creator=mock_dbapi.connect, + pool_size=3, max_overflow=-1, + use_threadlocal=True), \ + pool.SingletonThreadPool(creator=mock_dbapi.connect, + use_threadlocal=True): c1 = p.connect() c2 = p.connect() self.assert_(c1 is c2) @@ -129,7 +131,6 @@ class PoolTest(PoolTestBase): else: c2 = None lazy_gc() - if useclose: c1 = p.connect() c2 = p.connect() @@ -138,13 +139,13 @@ class PoolTest(PoolTestBase): c2.close() self.assert_(c1.connection is not None) c1.close() - c1 = c2 = c3 = None - # extra tests with QueuePool to ensure connections get __del__()ed when dereferenced + # extra tests with QueuePool to ensure connections get + # __del__()ed when dereferenced + if isinstance(p, pool.QueuePool): lazy_gc() - self.assert_(p.checkedout() == 0) c1 = p.connect() c2 = p.connect() @@ -210,7 +211,8 @@ class PoolTest(PoolTestBase): eq_(len(innerself.first_connected), fconn) eq_(len(innerself.checked_out), cout) eq_(len(innerself.checked_in), cin) - def assert_in(innerself, item, in_conn, in_fconn, in_cout, in_cin): + def assert_in(innerself, item, in_conn, in_fconn, + in_cout, in_cin): self.assert_((item in innerself.connected) == in_conn) self.assert_((item in innerself.first_connected) == in_fconn) self.assert_((item in innerself.checked_out) == in_cout) @@ -445,25 +447,28 @@ class QueuePoolTest(PoolTestBase): self._do_testqueuepool(useclose=True) def _do_testqueuepool(self, useclose=False): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = False) + p = pool.QueuePool(creator=mock_dbapi.connect, pool_size=3, + max_overflow=-1, use_threadlocal=False) def status(pool): - tup = (pool.size(), pool.checkedin(), pool.overflow(), pool.checkedout()) - print "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup + tup = pool.size(), pool.checkedin(), pool.overflow(), \ + pool.checkedout() + print 'Pool size: %d Connections in pool: %d Current '\ + 'Overflow: %d Current Checked out connections: %d' % tup return tup c1 = p.connect() - self.assert_(status(p) == (3,0,-2,1)) + self.assert_(status(p) == (3, 0, -2, 1)) c2 = p.connect() - self.assert_(status(p) == (3,0,-1,2)) + self.assert_(status(p) == (3, 0, -1, 2)) c3 = p.connect() - self.assert_(status(p) == (3,0,0,3)) + self.assert_(status(p) == (3, 0, 0, 3)) c4 = p.connect() - self.assert_(status(p) == (3,0,1,4)) + self.assert_(status(p) == (3, 0, 1, 4)) c5 = p.connect() - self.assert_(status(p) == (3,0,2,5)) + self.assert_(status(p) == (3, 0, 2, 5)) c6 = p.connect() - self.assert_(status(p) == (3,0,3,6)) + self.assert_(status(p) == (3, 0, 3, 6)) if useclose: c4.close() c3.close() @@ -471,8 +476,7 @@ class QueuePoolTest(PoolTestBase): else: c4 = c3 = c2 = None lazy_gc() - - self.assert_(status(p) == (3,3,3,3)) + self.assert_(status(p) == (3, 3, 3, 3)) if useclose: c1.close() c5.close() @@ -480,9 +484,7 @@ class QueuePoolTest(PoolTestBase): else: c1 = c5 = c6 = None lazy_gc() - - self.assert_(status(p) == (3,3,0,0)) - + self.assert_(status(p) == (3, 3, 0, 0)) c1 = p.connect() c2 = p.connect() self.assert_(status(p) == (3, 1, 0, 2), status(p)) @@ -491,16 +493,15 @@ class QueuePoolTest(PoolTestBase): else: c2 = None lazy_gc() - - self.assert_(status(p) == (3, 2, 0, 1)) - + self.assert_(status(p) == (3, 2, 0, 1)) c1.close() - lazy_gc() assert not pool._refs def test_timeout(self): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = 0, use_threadlocal = False, timeout=2) + p = pool.QueuePool(creator=mock_dbapi.connect, pool_size=3, + max_overflow=0, use_threadlocal=False, + timeout=2) c1 = p.connect() c2 = p.connect() c3 = p.connect() @@ -591,7 +592,8 @@ class QueuePoolTest(PoolTestBase): self._test_overflow(40, 5) def test_mixed_close(self): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) + p = pool.QueuePool(creator=mock_dbapi.connect, pool_size=3, + max_overflow=-1, use_threadlocal=True) c1 = p.connect() c2 = p.connect() assert c1 is c2 @@ -601,12 +603,12 @@ class QueuePoolTest(PoolTestBase): c1 = None lazy_gc() assert p.checkedout() == 0 - lazy_gc() assert not pool._refs - + def test_weakref_kaboom(self): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) + p = pool.QueuePool(creator=mock_dbapi.connect, pool_size=3, + max_overflow=-1, use_threadlocal=True) c1 = p.connect() c2 = p.connect() c1.close() @@ -617,12 +619,16 @@ class QueuePoolTest(PoolTestBase): assert p.checkedout() == 0 c3 = p.connect() assert c3 is not None - + def test_trick_the_counter(self): - """this is a "flaw" in the connection pool; since threadlocal uses a single ConnectionFairy per thread - with an open/close counter, you can fool the counter into giving you a ConnectionFairy with an - ambiguous counter. i.e. its not true reference counting.""" - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) + """this is a "flaw" in the connection pool; since threadlocal + uses a single ConnectionFairy per thread with an open/close + counter, you can fool the counter into giving you a + ConnectionFairy with an ambiguous counter. i.e. its not true + reference counting.""" + + p = pool.QueuePool(creator=mock_dbapi.connect, pool_size=3, + max_overflow=-1, use_threadlocal=True) c1 = p.connect() c2 = p.connect() assert c1 is c2 @@ -630,13 +636,13 @@ class QueuePoolTest(PoolTestBase): c2 = p.connect() c2.close() self.assert_(p.checkedout() != 0) - c2.close() self.assert_(p.checkedout() == 0) def test_recycle(self): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 1, max_overflow = 0, use_threadlocal = False, recycle=3) - + p = pool.QueuePool(creator=mock_dbapi.connect, pool_size=1, + max_overflow=0, use_threadlocal=False, + recycle=3) c1 = p.connect() c_id = id(c1.connection) c1.close() @@ -644,96 +650,101 @@ class QueuePoolTest(PoolTestBase): assert id(c2.connection) == c_id c2.close() time.sleep(4) - c3= p.connect() + c3 = p.connect() assert id(c3.connection) != c_id - + def test_invalidate(self): dbapi = MockDBAPI() - p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) + p = pool.QueuePool(creator=lambda : dbapi.connect('foo.db'), + pool_size=1, max_overflow=0, + use_threadlocal=False) c1 = p.connect() c_id = c1.connection.id - c1.close(); c1=None + c1.close() + c1 = None c1 = p.connect() assert c1.connection.id == c_id c1.invalidate() c1 = None - c1 = p.connect() assert c1.connection.id != c_id - + def test_recreate(self): dbapi = MockDBAPI() - p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) + p = pool.QueuePool(creator=lambda : dbapi.connect('foo.db'), + pool_size=1, max_overflow=0, + use_threadlocal=False) p2 = p.recreate() assert p2.size() == 1 assert p2._use_threadlocal is False assert p2._max_overflow == 0 - + def test_reconnect(self): - """tests reconnect operations at the pool level. SA's engine/dialect includes another - layer of reconnect support for 'database was lost' errors.""" - + """tests reconnect operations at the pool level. SA's + engine/dialect includes another layer of reconnect support for + 'database was lost' errors.""" + dbapi = MockDBAPI() - p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) + p = pool.QueuePool(creator=lambda : dbapi.connect('foo.db'), + pool_size=1, max_overflow=0, + use_threadlocal=False) c1 = p.connect() c_id = c1.connection.id - c1.close(); c1=None - + c1.close() + c1 = None c1 = p.connect() assert c1.connection.id == c_id dbapi.raise_error = True c1.invalidate() c1 = None - c1 = p.connect() assert c1.connection.id != c_id - + def test_detach(self): dbapi = MockDBAPI() - p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) - + p = pool.QueuePool(creator=lambda : dbapi.connect('foo.db'), + pool_size=1, max_overflow=0, + use_threadlocal=False) c1 = p.connect() c1.detach() c_id = c1.connection.id - c2 = p.connect() assert c2.connection.id != c1.connection.id dbapi.raise_error = True - c2.invalidate() c2 = None - c2 = p.connect() assert c2.connection.id != c1.connection.id - con = c1.connection - assert not con.closed c1.close() assert con.closed - + def test_threadfairy(self): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) + p = pool.QueuePool(creator=mock_dbapi.connect, pool_size=3, + max_overflow=-1, use_threadlocal=True) c1 = p.connect() c1.close() c2 = p.connect() assert c2.connection is not None class SingletonThreadPoolTest(PoolTestBase): + def test_cleanup(self): - """test that the pool's connections are OK after cleanup() has been called.""" - - p = pool.SingletonThreadPool(creator = mock_dbapi.connect, pool_size=3) - + """test that the pool's connections are OK after cleanup() has + been called.""" + + p = pool.SingletonThreadPool(creator=mock_dbapi.connect, + pool_size=3) + def checkout(): for x in xrange(10): c = p.connect() assert c c.cursor() c.close() - time.sleep(.1) - + threads = [] for i in xrange(10): th = threading.Thread(target=checkout) @@ -741,7 +752,6 @@ class SingletonThreadPoolTest(PoolTestBase): threads.append(th) for th in threads: th.join() - assert len(p._all_conns) == 3 class NullPoolTest(PoolTestBase): diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index dddc445ac6..b92c880663 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -62,28 +62,33 @@ class MockReconnectTest(TestBase): db.dialect.is_disconnect = lambda e: isinstance(e, MockDisconnect) def test_reconnect(self): - """test that an 'is_disconnect' condition will invalidate the connection, and additionally - dispose the previous connection pool and recreate.""" - + """test that an 'is_disconnect' condition will invalidate the + connection, and additionally dispose the previous connection + pool and recreate.""" pid = id(db.pool) # make a connection + conn = db.connect() # connection works + conn.execute(select([1])) - # create a second connection within the pool, which we'll ensure also goes away + # create a second connection within the pool, which we'll ensure + # also goes away + conn2 = db.connect() conn2.close() # two connections opened total now + assert len(dbapi.connections) == 2 # set it to fail - dbapi.shutdown() + dbapi.shutdown() try: conn.execute(select([1])) assert False @@ -91,19 +96,20 @@ class MockReconnectTest(TestBase): pass # assert was invalidated + assert not conn.closed assert conn.invalidated # close shouldnt break - conn.close() + conn.close() assert id(db.pool) != pid # ensure all connections closed (pool was recycled) + gc_collect() assert len(dbapi.connections) == 0 - - conn =db.connect() + conn = db.connect() conn.execute(select([1])) conn.close() assert len(dbapi.connections) == 1 @@ -112,7 +118,6 @@ class MockReconnectTest(TestBase): conn = db.connect() trans = conn.begin() dbapi.shutdown() - try: conn.execute(select([1])) assert False @@ -120,34 +125,32 @@ class MockReconnectTest(TestBase): pass # assert was invalidated + gc_collect() assert len(dbapi.connections) == 0 assert not conn.closed assert conn.invalidated assert trans.is_active - try: conn.execute(select([1])) assert False except tsa.exc.InvalidRequestError, e: - assert str(e) == "Can't reconnect until invalid transaction is rolled back" - + assert str(e) \ + == "Can't reconnect until invalid transaction is "\ + "rolled back" assert trans.is_active - try: trans.commit() assert False except tsa.exc.InvalidRequestError, e: - assert str(e) == "Can't reconnect until invalid transaction is rolled back" - + assert str(e) \ + == "Can't reconnect until invalid transaction is "\ + "rolled back" assert trans.is_active - trans.rollback() assert not trans.is_active - conn.execute(select([1])) assert not conn.invalidated - assert len(dbapi.connections) == 1 def test_conn_reusable(self): @@ -256,7 +259,8 @@ class RealReconnectTest(TestBase): conn.close() def test_null_pool(self): - engine = engines.reconnecting_engine(options=dict(poolclass=pool.NullPool)) + engine = \ + engines.reconnecting_engine(options=dict(poolclass=pool.NullPool)) conn = engine.connect() eq_(conn.execute(select([1])).scalar(), 1) assert not conn.closed @@ -292,60 +296,52 @@ class RealReconnectTest(TestBase): def test_with_transaction(self): conn = engine.connect() - trans = conn.begin() - eq_(conn.execute(select([1])).scalar(), 1) assert not conn.closed - engine.test_shutdown() - try: conn.execute(select([1])) assert False except tsa.exc.DBAPIError, e: if not e.connection_invalidated: raise - assert not conn.closed assert conn.invalidated assert trans.is_active - try: conn.execute(select([1])) assert False except tsa.exc.InvalidRequestError, e: - assert str(e) == "Can't reconnect until invalid transaction is rolled back" - + assert str(e) \ + == "Can't reconnect until invalid transaction is "\ + "rolled back" assert trans.is_active - try: trans.commit() assert False except tsa.exc.InvalidRequestError, e: - assert str(e) == "Can't reconnect until invalid transaction is rolled back" - + assert str(e) \ + == "Can't reconnect until invalid transaction is "\ + "rolled back" assert trans.is_active - trans.rollback() assert not trans.is_active - assert conn.invalidated eq_(conn.execute(select([1])).scalar(), 1) assert not conn.invalidated class RecycleTest(TestBase): + def test_basic(self): - for threadlocal in (False, True): - engine = engines.reconnecting_engine(options={'pool_recycle':1, 'pool_threadlocal':threadlocal}) - + for threadlocal in False, True: + engine = engines.reconnecting_engine(options={'pool_recycle' + : 1, 'pool_threadlocal': threadlocal}) conn = engine.contextual_connect() eq_(conn.execute(select([1])).scalar(), 1) conn.close() - engine.test_shutdown() time.sleep(2) - conn = engine.contextual_connect() eq_(conn.execute(select([1])).scalar(), 1) conn.close() @@ -368,22 +364,22 @@ class InvalidateDuringResultTest(TestBase): meta.drop_all() engine.dispose() - @testing.fails_on('+mysqldb', "Buffers the result set and doesn't check for connection close") - @testing.fails_on('+pg8000', "Buffers the result set and doesn't check for connection close") + @testing.fails_on('+mysqldb', + "Buffers the result set and doesn't check for " + "connection close") + @testing.fails_on('+pg8000', + "Buffers the result set and doesn't check for " + "connection close") def test_invalidate_on_results(self): conn = engine.connect() - - result = conn.execute("select * from sometable") + result = conn.execute('select * from sometable') for x in xrange(20): result.fetchone() - engine.test_shutdown() try: - print "ghost result: %r" % result.fetchone() + print 'ghost result: %r' % result.fetchone() assert False except tsa.exc.DBAPIError, e: if not e.connection_invalidated: raise - assert conn.invalidated - diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py index be53b05492..1cbc9ab598 100644 --- a/test/engine/test_reflection.py +++ b/test/engine/test_reflection.py @@ -15,8 +15,10 @@ metadata, users = None, None class ReflectionTest(TestBase, ComparesTables): - @testing.exclude('mssql', '<', (10, 0, 0), 'Date is only supported on MSSQL 2008+') - @testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely') + @testing.exclude('mssql', '<', (10, 0, 0), + 'Date is only supported on MSSQL 2008+') + @testing.exclude('mysql', '<', (4, 1, 1), + 'early types are squirrely') def test_basic_reflection(self): meta = MetaData(testing.db) @@ -39,20 +41,24 @@ class ReflectionTest(TestBase, ComparesTables): test_needs_fk=True, ) - addresses = Table('engine_email_addresses', meta, - Column('address_id', sa.Integer, primary_key = True), - Column('remote_user_id', sa.Integer, sa.ForeignKey(users.c.user_id)), + addresses = Table( + 'engine_email_addresses', + meta, + Column('address_id', sa.Integer, primary_key=True), + Column('remote_user_id', sa.Integer, + sa.ForeignKey(users.c.user_id)), Column('email_address', sa.String(20)), test_needs_fk=True, - ) + ) meta.create_all() try: meta2 = MetaData() - reflected_users = Table('engine_users', meta2, autoload=True, + reflected_users = Table('engine_users', meta2, + autoload=True, autoload_with=testing.db) - reflected_addresses = Table('engine_email_addresses', meta2, - autoload=True, autoload_with=testing.db) + reflected_addresses = Table('engine_email_addresses', + meta2, autoload=True, autoload_with=testing.db) self.assert_tables_equal(users, reflected_users) self.assert_tables_equal(addresses, reflected_addresses) finally: @@ -60,34 +66,33 @@ class ReflectionTest(TestBase, ComparesTables): def test_two_foreign_keys(self): meta = MetaData(testing.db) - t1 = Table('t1', meta, - Column('id', sa.Integer, primary_key=True), - Column('t2id', sa.Integer, sa.ForeignKey('t2.id')), - Column('t3id', sa.Integer, sa.ForeignKey('t3.id')), - test_needs_fk=True - ) - t2 = Table('t2', meta, - Column('id', sa.Integer, primary_key=True), - test_needs_fk=True - ) - t3 = Table('t3', meta, - Column('id', sa.Integer, primary_key=True), - test_needs_fk=True - ) + t1 = Table( + 't1', + meta, + Column('id', sa.Integer, primary_key=True), + Column('t2id', sa.Integer, sa.ForeignKey('t2.id')), + Column('t3id', sa.Integer, sa.ForeignKey('t3.id')), + test_needs_fk=True, + ) + t2 = Table('t2', meta, Column('id', sa.Integer, + primary_key=True), test_needs_fk=True) + t3 = Table('t3', meta, Column('id', sa.Integer, + primary_key=True), test_needs_fk=True) meta.create_all() try: meta2 = MetaData() - t1r, t2r, t3r = [Table(x, meta2, autoload=True, autoload_with=testing.db) for x in ('t1', 't2', 't3')] - + t1r, t2r, t3r = [Table(x, meta2, autoload=True, + autoload_with=testing.db) for x in ('t1', + 't2', 't3')] assert t1r.c.t2id.references(t2r.c.id) assert t1r.c.t3id.references(t3r.c.id) - finally: meta.drop_all() def test_nonexistent(self): meta = MetaData(testing.db) - assert_raises(sa.exc.NoSuchTableError, Table, "nonexistent", meta, autoload=True) + assert_raises(sa.exc.NoSuchTableError, Table, 'nonexistent', + meta, autoload=True) def test_include_columns(self): meta = MetaData(testing.db) @@ -156,7 +161,8 @@ class ReflectionTest(TestBase, ComparesTables): Column('data', sa.String(50)), ) t2 = Table('test2', meta, - Column('id', sa.Integer, sa.ForeignKey('test.id'), primary_key=True), + Column('id', sa.Integer, sa.ForeignKey('test.id'), + primary_key=True), Column('id2', sa.Integer, primary_key=True), Column('data', sa.String(50)), ) @@ -218,8 +224,9 @@ class ReflectionTest(TestBase, ComparesTables): table.drop() def test_override_pkfk(self): - """test that you can override columns which contain foreign keys to other reflected tables, - where the foreign key column is also a primary key column""" + """test that you can override columns which contain foreign keys + to other reflected tables, where the foreign key column is also + a primary key column""" meta = MetaData(testing.db) users = Table('users', meta, @@ -258,8 +265,9 @@ class ReflectionTest(TestBase, ComparesTables): meta.drop_all() def test_override_nonexistent_fk(self): - """test that you can override columns and create new foreign keys to other reflected tables - which have no foreign keys. this is common with MySQL MyISAM tables.""" + """test that you can override columns and create new foreign + keys to other reflected tables which have no foreign keys. this + is common with MySQL MyISAM tables.""" meta = MetaData(testing.db) users = Table('users', meta, @@ -273,38 +281,44 @@ class ReflectionTest(TestBase, ComparesTables): meta.create_all() try: meta2 = MetaData(testing.db) - a2 = Table('addresses', meta2, - Column('user_id', sa.Integer, sa.ForeignKey('users.id')), - autoload=True) + a2 = Table('addresses', meta2, + Column('user_id',sa.Integer, sa.ForeignKey('users.id')), + autoload=True) u2 = Table('users', meta2, autoload=True) - assert len(a2.c.user_id.foreign_keys) == 1 assert len(a2.foreign_keys) == 1 assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] - assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id] - assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id - assert u2.join(a2).onclause.compare(u2.c.id==a2.c.user_id) - + assert [c.parent for c in a2.c.user_id.foreign_keys] \ + == [a2.c.user_id] + assert list(a2.c.user_id.foreign_keys)[0].parent \ + is a2.c.user_id + assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id) meta3 = MetaData(testing.db) + u3 = Table('users', meta3, autoload=True) - a3 = Table('addresses', meta3, - Column('user_id', sa.Integer, sa.ForeignKey('users.id')), - autoload=True) - assert u3.join(a3).onclause.compare(u3.c.id==a3.c.user_id) + a3 = Table('addresses', meta3, Column('user_id', + sa.Integer, sa.ForeignKey('users.id')), + autoload=True) + assert u3.join(a3).onclause.compare(u3.c.id == a3.c.user_id) meta4 = MetaData(testing.db) - u4 = Table('users', meta4, - Column('id', sa.Integer, key='u_id', primary_key=True), - autoload=True) - a4 = Table('addresses', meta4, - Column('id', sa.Integer, key='street', primary_key=True), - Column('street', sa.String(30), key='user_id'), - Column('user_id', sa.Integer, sa.ForeignKey('users.u_id'), - key='id'), - autoload=True) - assert u4.join(a4).onclause.compare(u4.c.u_id==a4.c.id) + u4 = Table('users', meta4, + Column('id', sa.Integer, key='u_id', primary_key=True), + autoload=True) + + a4 = Table( + 'addresses', + meta4, + Column('id', sa.Integer, key='street', + primary_key=True), + Column('street', sa.String(30), key='user_id'), + Column('user_id', sa.Integer, sa.ForeignKey('users.u_id' + ), key='id'), + autoload=True, + ) + assert u4.join(a4).onclause.compare(u4.c.u_id == a4.c.id) assert list(u4.primary_key) == [u4.c.u_id] assert len(u4.columns) == 2 assert len(u4.constraints) == 1 @@ -331,10 +345,11 @@ class ReflectionTest(TestBase, ComparesTables): meta.create_all() try: m2 = MetaData(testing.db) - a2 = Table('a', m2, Column('x', sa.Integer, primary_key=True, key='x1'), autoload=True) + a2 = Table('a', m2, + Column('x', sa.Integer, primary_key=True, key='x1'), + autoload=True) b2 = Table('b', m2, autoload=True) - - assert a2.join(b2).onclause.compare(a2.c.x1==b2.c.y) + assert a2.join(b2).onclause.compare(a2.c.x1 == b2.c.y) assert b2.c.y.references(a2.c.x1) finally: meta.drop_all() @@ -368,9 +383,9 @@ class ReflectionTest(TestBase, ComparesTables): @testing.exclude('mysql', '<', (4, 1, 1), 'innodb funkiness') def test_override_existing_fk(self): - """test that you can override columns and specify new foreign keys to other reflected tables, - on columns which *do* already have that foreign key, and that the FK is not duped. - """ + """test that you can override columns and specify new foreign + keys to other reflected tables, on columns which *do* already + have that foreign key, and that the FK is not duped. """ meta = MetaData(testing.db) users = Table('users', meta, @@ -385,78 +400,88 @@ class ReflectionTest(TestBase, ComparesTables): meta.create_all() try: meta2 = MetaData(testing.db) - a2 = Table('addresses', meta2, - Column('user_id', sa.Integer, sa.ForeignKey('users.id')), - autoload=True) + a2 = Table('addresses', meta2, + Column('user_id',sa.Integer, sa.ForeignKey('users.id')), + autoload=True) u2 = Table('users', meta2, autoload=True) - s = sa.select([a2]) + assert s.c.user_id is not None assert len(a2.foreign_keys) == 1 assert len(a2.c.user_id.foreign_keys) == 1 assert len(a2.constraints) == 2 assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] - assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id] - assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id - assert u2.join(a2).onclause.compare(u2.c.id==a2.c.user_id) - + assert [c.parent for c in a2.c.user_id.foreign_keys] \ + == [a2.c.user_id] + assert list(a2.c.user_id.foreign_keys)[0].parent \ + is a2.c.user_id + assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id) + meta2 = MetaData(testing.db) - u2 = Table('users', meta2, - Column('id', sa.Integer, primary_key=True), - autoload=True) - a2 = Table('addresses', meta2, - Column('id', sa.Integer, primary_key=True), - Column('user_id', sa.Integer, sa.ForeignKey('users.id')), - autoload=True) - + u2 = Table('users', meta2, Column('id', sa.Integer, + primary_key=True), autoload=True) + a2 = Table('addresses', meta2, Column('id', sa.Integer, + primary_key=True), Column('user_id', sa.Integer, + sa.ForeignKey('users.id')), autoload=True) s = sa.select([a2]) + assert s.c.user_id is not None assert len(a2.foreign_keys) == 1 assert len(a2.c.user_id.foreign_keys) == 1 assert len(a2.constraints) == 2 assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] - assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id] - assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id - assert u2.join(a2).onclause.compare(u2.c.id==a2.c.user_id) + assert [c.parent for c in a2.c.user_id.foreign_keys] \ + == [a2.c.user_id] + assert list(a2.c.user_id.foreign_keys)[0].parent \ + is a2.c.user_id + assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id) + finally: meta.drop_all() @testing.exclude('mysql', '<', (4, 1, 1), 'innodb funkiness') def test_use_existing(self): meta = MetaData(testing.db) - users = Table('users', meta, + users = Table('users', meta, + Column('id', sa.Integer, primary_key=True), + Column('name', sa.String(30)), + test_needs_fk=True) + addresses = Table( + 'addresses', + meta, Column('id', sa.Integer, primary_key=True), - Column('name', sa.String(30)), - test_needs_fk=True) - addresses = Table('addresses', meta, - Column('id', sa.Integer,primary_key=True), Column('user_id', sa.Integer, sa.ForeignKey('users.id')), Column('data', sa.String(100)), - test_needs_fk=True) - + test_needs_fk=True, + ) meta.create_all() try: meta2 = MetaData(testing.db) - addresses = Table('addresses', meta2, Column('data', sa.Unicode), autoload=True) + addresses = Table('addresses', meta2, Column('data', + sa.Unicode), autoload=True) try: - users = Table('users', meta2, Column('name', sa.Unicode), autoload=True) + users = Table('users', meta2, Column('name', + sa.Unicode), autoload=True) assert False except sa.exc.InvalidRequestError, err: - assert str(err) == "Table 'users' is already defined for this MetaData instance. Specify 'useexisting=True' to redefine options and columns on an existing Table object." - - users = Table('users', meta2, Column('name', sa.Unicode), autoload=True, useexisting=True) + assert str(err) \ + == "Table 'users' is already defined for this "\ + "MetaData instance. Specify 'useexisting=True' "\ + "to redefine options and columns on an existing "\ + "Table object." + users = Table('users', meta2, Column('name', sa.Unicode), + autoload=True, useexisting=True) assert isinstance(users.c.name.type, sa.Unicode) - assert not users.quote - - users = Table('users', meta2, quote=True, autoload=True, useexisting=True) + users = Table('users', meta2, quote=True, autoload=True, + useexisting=True) assert users.quote - finally: meta.drop_all() def test_pks_not_uniques(self): - """test that primary key reflection not tripped up by unique indexes""" + """test that primary key reflection not tripped up by unique + indexes""" testing.db.execute(""" CREATE TABLE book ( @@ -484,9 +509,10 @@ class ReflectionTest(TestBase, ComparesTables): Column('slot', sa.String(128)), ) - assert_raises_message(sa.exc.InvalidRequestError, - "Could not find table 'pkgs' with which to generate a foreign key", - metadata.create_all) + assert_raises_message(sa.exc.InvalidRequestError, + "Could not find table 'pkgs' with which " + "to generate a foreign key", + metadata.create_all) def test_composite_pks(self): """test reflection of a composite primary key""" @@ -531,58 +557,67 @@ class ReflectionTest(TestBase, ComparesTables): Column('bar', sa.Integer), Column('lala', sa.Integer), Column('data', sa.String(50)), - sa.ForeignKeyConstraint(['foo', 'bar', 'lala'], ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho']), + sa.ForeignKeyConstraint(['foo', 'bar', 'lala'], + ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho' + ]), test_needs_fk=True, ) meta.create_all() try: meta2 = MetaData() - table = Table('multi', meta2, autoload=True, autoload_with=testing.db) - table2 = Table('multi2', meta2, autoload=True, autoload_with=testing.db) + table = Table('multi', meta2, autoload=True, + autoload_with=testing.db) + table2 = Table('multi2', meta2, autoload=True, + autoload_with=testing.db) self.assert_tables_equal(multi, table) self.assert_tables_equal(multi2, table2) j = sa.join(table, table2) - self.assert_(sa.and_(table.c.multi_id==table2.c.foo, table.c.multi_rev==table2.c.bar, table.c.multi_hoho==table2.c.lala).compare(j.onclause)) + self.assert_(sa.and_(table.c.multi_id == table2.c.foo, + table.c.multi_rev == table2.c.bar, + table.c.multi_hoho + == table2.c.lala).compare(j.onclause)) finally: meta.drop_all() @testing.crashes('oracle', 'FIXME: unknown, confirm not fails_on') def test_reserved(self): - # check a table that uses an SQL reserved name doesn't cause an error + + # check a table that uses an SQL reserved name doesn't cause an + # error + meta = MetaData(testing.db) - table_a = Table('select', meta, - Column('not', sa.Integer, primary_key=True), - Column('from', sa.String(12), nullable=False), - sa.UniqueConstraint('from', name='when')) + table_a = Table('select', meta, Column('not', sa.Integer, + primary_key=True), Column('from', + sa.String(12), nullable=False), + sa.UniqueConstraint('from', name='when')) sa.Index('where', table_a.c['from']) - # There's currently no way to calculate identifier case normalization - # in isolation, so... + # There's currently no way to calculate identifier case + # normalization in isolation, so... + if testing.against('firebird', 'oracle', 'maxdb'): check_col = 'TRUE' else: check_col = 'true' quoter = meta.bind.dialect.identifier_preparer.quote_identifier - - table_b = Table('false', meta, - Column('create', sa.Integer, primary_key=True), - Column('true', sa.Integer, sa.ForeignKey('select.not')), - sa.CheckConstraint('%s <> 1' % quoter(check_col), - name='limit')) - - table_c = Table('is', meta, - Column('or', sa.Integer, nullable=False, primary_key=True), - Column('join', sa.Integer, nullable=False, primary_key=True), - sa.PrimaryKeyConstraint('or', 'join', name='to')) - + + table_b = Table('false', meta, + Column('create', sa.Integer, primary_key=True), + Column('true', sa.Integer,sa.ForeignKey('select.not')), + sa.CheckConstraint('%s <> 1' + % quoter(check_col), name='limit') + ) + + table_c = Table('is', meta, + Column('or', sa.Integer, nullable=False, primary_key=True), + Column('join', sa.Integer, nullable=False, primary_key=True), + sa.PrimaryKeyConstraint('or', 'join', name='to') + ) index_c = sa.Index('else', table_c.c.join) - meta.create_all() - index_c.drop() - meta2 = MetaData(testing.db) try: table_a2 = Table('select', meta2, autoload=True) @@ -643,9 +678,9 @@ class ReflectionTest(TestBase, ComparesTables): m8 = MetaData(reflect=True) self.assert_(False) except sa.exc.ArgumentError, e: - self.assert_( - e.args[0] == - "A bind must be supplied in conjunction with reflect=True") + self.assert_(e.args[0] + == 'A bind must be supplied in ' + 'conjunction with reflect=True') finally: baseline.drop_all() @@ -710,33 +745,40 @@ class ReflectionTest(TestBase, ComparesTables): meta.drop_all() class CreateDropTest(TestBase): + @classmethod def setup_class(cls): global metadata, users metadata = MetaData() - users = Table('users', metadata, - Column('user_id', sa.Integer, sa.Sequence('user_id_seq', optional=True), primary_key=True), - Column('user_name', sa.String(40)), - ) - + users = Table('users', metadata, + Column('user_id', sa.Integer, + sa.Sequence('user_id_seq', optional=True), + primary_key=True), + Column('user_name',sa.String(40))) + addresses = Table('email_addresses', metadata, - Column('address_id', sa.Integer, sa.Sequence('address_id_seq', optional=True), primary_key = True), - Column('user_id', sa.Integer, sa.ForeignKey(users.c.user_id)), - Column('email_address', sa.String(40)), - ) - - orders = Table('orders', metadata, - Column('order_id', sa.Integer, sa.Sequence('order_id_seq', optional=True), primary_key = True), - Column('user_id', sa.Integer, sa.ForeignKey(users.c.user_id)), + Column('address_id', sa.Integer, + sa.Sequence('address_id_seq', optional=True), + primary_key=True), + Column('user_id', + sa.Integer, sa.ForeignKey(users.c.user_id)), + Column('email_address', sa.String(40))) + + orders = Table( + 'orders', + metadata, + Column('order_id', sa.Integer, sa.Sequence('order_id_seq', + optional=True), primary_key=True), + Column('user_id', sa.Integer, + sa.ForeignKey(users.c.user_id)), Column('description', sa.String(50)), Column('isopen', sa.Integer), - ) - - orderitems = Table('items', metadata, - Column('item_id', sa.INT, sa.Sequence('items_id_seq', optional=True), primary_key = True), - Column('order_id', sa.INT, sa.ForeignKey("orders")), - Column('item_name', sa.VARCHAR(50)), - ) + ) + orderitems = Table('items', metadata, Column('item_id', sa.INT, + sa.Sequence('items_id_seq', optional=True), + primary_key=True), Column('order_id', + sa.INT, sa.ForeignKey('orders')), + Column('item_name', sa.VARCHAR(50))) def test_sorter( self ): tables = metadata.sorted_tables @@ -777,10 +819,14 @@ class CreateDropTest(TestBase): def test_tablenames(self): metadata.create_all(bind=testing.db) - # we only check to see if all the explicitly created tables are there, rather than - # assertEqual -- the test db could have "extra" tables if there is a misconfigured - # template. (*cough* tsearch2 w/ the pg windows installer.) - self.assert_(not set(metadata.tables) - set(testing.db.table_names())) + + # we only check to see if all the explicitly created tables are + # there, rather than assertEqual -- the test db could have + # "extra" tables if there is a misconfigured template. (*cough* + # tsearch2 w/ the pg windows installer.) + + self.assert_(not set(metadata.tables) + - set(testing.db.table_names())) metadata.drop_all(bind=testing.db) class SchemaManipulationTest(TestBase): @@ -788,7 +834,9 @@ class SchemaManipulationTest(TestBase): meta = MetaData() users = Table('users', meta, Column('id', sa.Integer)) - addresses = Table('addresses', meta, Column('id', sa.Integer), Column('user_id', sa.Integer)) + addresses = Table('addresses', meta, + Column('id', sa.Integer), + Column('user_id', sa.Integer)) fk = sa.ForeignKeyConstraint(['user_id'],[users.c.id]) @@ -798,35 +846,46 @@ class SchemaManipulationTest(TestBase): assert addresses.constraints == set([addresses.primary_key, fk]) class UnicodeReflectionTest(TestBase): + @testing.requires.unicode_connections def test_basic(self): try: - # the 'convert_unicode' should not get in the way of the reflection - # process. reflecttable for oracle, postgresql (others?) expect non-unicode - # strings in result sets/bind params - bind = engines.utf8_engine(options={'convert_unicode':True}) - metadata = MetaData(bind) + # the 'convert_unicode' should not get in the way of the + # reflection process. reflecttable for oracle, postgresql + # (others?) expect non-unicode strings in result sets/bind + # params + + bind = engines.utf8_engine(options={'convert_unicode' + : True}) + metadata = MetaData(bind) if testing.against('sybase', 'maxdb', 'oracle', 'mssql'): names = set([u'plain']) else: - names = set([u'plain', u'Unit\u00e9ble', u'\u6e2c\u8a66']) - + names = set([u'plain', u'Unit\u00e9ble', u'\u6e2c\u8a66' + ]) for name in names: - Table(name, metadata, Column('id', sa.Integer, sa.Sequence(name + "_id_seq"), - primary_key=True)) + Table(name, metadata, Column('id', sa.Integer, + sa.Sequence(name + '_id_seq'), primary_key=True)) metadata.create_all() - reflected = set(bind.table_names()) + # Jython 2.5 on Java 5 lacks unicodedata.normalize - if not names.issubset(reflected) and hasattr(unicodedata, 'normalize'): - # Python source files in the utf-8 coding seem to normalize - # literals as NFC (and the above are explicitly NFC). Maybe - # this database normalizes NFD on reflection. - nfc = set([unicodedata.normalize('NFC', n) for n in names]) + + if not names.issubset(reflected) and hasattr(unicodedata, + 'normalize'): + + # Python source files in the utf-8 coding seem to + # normalize literals as NFC (and the above are + # explicitly NFC). Maybe this database normalizes NFD + # on reflection. + + nfc = set([unicodedata.normalize('NFC', n) for n in + names]) self.assert_(nfc == names) - # Yep. But still ensure that bulk reflection and create/drop - # work with either normalization. + + # Yep. But still ensure that bulk reflection and + # create/drop work with either normalization. r = MetaData(bind, reflect=True) r.drop_all() @@ -840,13 +899,12 @@ class SchemaTest(TestBase): def test_iteration(self): metadata = MetaData() - table1 = Table('table1', metadata, - Column('col1', sa.Integer, primary_key=True), - schema='someschema') - table2 = Table('table2', metadata, - Column('col1', sa.Integer, primary_key=True), - Column('col2', sa.Integer, sa.ForeignKey('someschema.table1.col1')), - schema='someschema') + table1 = Table('table1', metadata, Column('col1', sa.Integer, + primary_key=True), schema='someschema') + table2 = Table('table2', metadata, Column('col1', sa.Integer, + primary_key=True), Column('col2', sa.Integer, + sa.ForeignKey('someschema.table1.col1')), + schema='someschema') t1 = str(schema.CreateTable(table1).compile(bind=testing.db)) t2 = str(schema.CreateTable(table2).compile(bind=testing.db)) @@ -904,16 +962,17 @@ class HasSequenceTest(TestBase): @testing.requires.sequences def test_has_sequence(self): metadata = MetaData() - users = Table('users', metadata, - Column('user_id', sa.Integer, sa.Sequence('user_id_seq'), primary_key=True), - Column('user_name', sa.String(40)), - ) + users = Table('users', metadata, Column('user_id', sa.Integer, + sa.Sequence('user_id_seq'), primary_key=True), + Column('user_name', sa.String(40))) metadata.create_all(bind=testing.db) try: - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), True) + eq_(testing.db.dialect.has_sequence(testing.db, + 'user_id_seq'), True) finally: metadata.drop_all(bind=testing.db) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), False) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + False) @testing.requires.schemas @testing.requires.sequences @@ -923,14 +982,20 @@ class HasSequenceTest(TestBase): s2 = sa.Sequence('user_id_seq') testing.db.execute(schema.CreateSequence(s1)) testing.db.execute(schema.CreateSequence(s2)) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', schema=test_schema), True) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), True) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', + schema=test_schema), True) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + True) testing.db.execute(schema.DropSequence(s1)) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', schema=test_schema), False) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), True) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', + schema=test_schema), False) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + True) testing.db.execute(schema.DropSequence(s2)) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', schema=test_schema), False) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), False) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', + schema=test_schema), False) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + False) # Tests related to engine.reflection @@ -1021,11 +1086,11 @@ class ReverseCasingReflectTest(TestBase, AssertsCompiledSQL): @testing.requires.denormalized_names def test_direct_quoting(self): m = MetaData(testing.db) - t = Table("weird_casing", m, autoload=True) - self.assert_compile( - t.select(), - 'SELECT weird_casing.col1, weird_casing."Col2", weird_casing."col3" FROM weird_casing' - ) + t = Table('weird_casing', m, autoload=True) + self.assert_compile(t.select(), + 'SELECT weird_casing.col1, ' + 'weird_casing."Col2", weird_casing."col3" ' + 'FROM weird_casing') class ComponentReflectionTest(TestBase): @@ -1087,7 +1152,7 @@ class ComponentReflectionTest(TestBase): def _test_get_columns(self, schema=None, table_type='table'): meta = MetaData(testing.db) - (users, addresses) = createTables(meta, schema) + users, addresses = createTables(meta, schema) table_names = ['users', 'email_addresses'] meta.create_all() if table_type == 'view': @@ -1095,36 +1160,41 @@ class ComponentReflectionTest(TestBase): table_names = ['users_v', 'email_addresses_v'] try: insp = Inspector(meta.bind) - for (table_name, table) in zip(table_names, (users, addresses)): + for table_name, table in zip(table_names, (users, + addresses)): schema_name = schema cols = insp.get_columns(table_name, schema=schema_name) self.assert_(len(cols) > 0, len(cols)) + # should be in order - for (i, col) in enumerate(table.columns): + + for i, col in enumerate(table.columns): eq_(col.name, cols[i]['name']) ctype = cols[i]['type'].__class__ ctype_def = col.type if isinstance(ctype_def, sa.types.TypeEngine): ctype_def = ctype_def.__class__ - + # Oracle returns Date for DateTime. - if testing.against('oracle') \ - and ctype_def in (sql_types.Date, sql_types.DateTime): - ctype_def = sql_types.Date - - # assert that the desired type and return type - # share a base within one of the generic types. - self.assert_( - len( - set( - ctype.__mro__ - ).intersection(ctype_def.__mro__) - .intersection([sql_types.Integer, sql_types.Numeric, - sql_types.DateTime, sql_types.Date, sql_types.Time, - sql_types.String, sql_types._Binary]) - ) > 0 - ,("%s(%s), %s(%s)" % (col.name, col.type, cols[i]['name'], - ctype))) + + if testing.against('oracle') and ctype_def \ + in (sql_types.Date, sql_types.DateTime): + ctype_def = sql_types.Date + + # assert that the desired type and return type share + # a base within one of the generic types. + + self.assert_(len(set(ctype.__mro__). + intersection(ctype_def.__mro__).intersection([ + sql_types.Integer, + sql_types.Numeric, + sql_types.DateTime, + sql_types.Date, + sql_types.Time, + sql_types.String, + sql_types._Binary, + ])) > 0, '%s(%s), %s(%s)' % (col.name, + col.type, cols[i]['name'], ctype)) finally: if table_type == 'view': dropViews(meta.bind, schema) diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index a9c2a68dd8..bc0985bec8 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -1,7 +1,10 @@ -from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message -import sys, time, threading +from sqlalchemy.test.testing import eq_, assert_raises, \ + assert_raises_message +import sys +import time +import threading from sqlalchemy import create_engine, MetaData, INT, VARCHAR, Sequence, \ - select, Integer, String, func, text, exc + select, Integer, String, func, text, exc from sqlalchemy.test.schema import Table from sqlalchemy.test.schema import Column from sqlalchemy.test import TestBase, testing @@ -76,36 +79,39 @@ class TransactionTest(TestBase): connection.close() def test_transaction_container(self): - + def go(conn, table, data): for d in data: conn.execute(table.insert(), d) - - testing.db.transaction(go, users, [dict(user_id=1, user_name='user1')]) - eq_(testing.db.execute(users.select()).fetchall(), [(1, 'user1')]) - - assert_raises(exc.DBAPIError, - testing.db.transaction, go, users, [ - {'user_id':2, 'user_name':'user2'}, - {'user_id':1, 'user_name':'user3'}, - ] - ) - eq_(testing.db.execute(users.select()).fetchall(), [(1, 'user1')]) + + testing.db.transaction(go, users, [dict(user_id=1, + user_name='user1')]) + eq_(testing.db.execute(users.select()).fetchall(), [(1, 'user1' + )]) + assert_raises(exc.DBAPIError, testing.db.transaction, go, + users, [{'user_id': 2, 'user_name': 'user2'}, + {'user_id': 1, 'user_name': 'user3'}]) + eq_(testing.db.execute(users.select()).fetchall(), [(1, 'user1' + )]) def test_nested_rollback(self): connection = testing.db.connect() - try: transaction = connection.begin() try: - connection.execute(users.insert(), user_id=1, user_name='user1') - connection.execute(users.insert(), user_id=2, user_name='user2') - connection.execute(users.insert(), user_id=3, user_name='user3') + connection.execute(users.insert(), user_id=1, + user_name='user1') + connection.execute(users.insert(), user_id=2, + user_name='user2') + connection.execute(users.insert(), user_id=3, + user_name='user3') trans2 = connection.begin() try: - connection.execute(users.insert(), user_id=4, user_name='user4') - connection.execute(users.insert(), user_id=5, user_name='user5') - raise Exception("uh oh") + connection.execute(users.insert(), user_id=4, + user_name='user4') + connection.execute(users.insert(), user_id=5, + user_name='user5') + raise Exception('uh oh') trans2.commit() except: trans2.rollback() @@ -116,7 +122,8 @@ class TransactionTest(TestBase): raise except Exception, e: try: - assert str(e) == 'uh oh' # and not "This transaction is inactive" + assert str(e) == 'uh oh' # and not "This transaction is + # inactive" finally: connection.close() @@ -143,9 +150,9 @@ class TransactionTest(TestBase): connection.execute(users.insert(), user_id=5, user_name='user5') trans2.commit() transaction.rollback() - self.assert_(connection.scalar("select count(1) from query_users") == 0) - - result = connection.execute("select * from query_users") + self.assert_(connection.scalar('select count(1) from ' + 'query_users') == 0) + result = connection.execute('select * from query_users') assert len(result.fetchall()) == 0 connection.close() @@ -163,9 +170,9 @@ class TransactionTest(TestBase): assert connection.in_transaction() transaction.commit() assert not connection.in_transaction() - self.assert_(connection.scalar("select count(1) from query_users") == 5) - - result = connection.execute("select * from query_users") + self.assert_(connection.scalar('select count(1) from ' + 'query_users') == 5) + result = connection.execute('select * from query_users') assert len(result.fetchall()) == 5 connection.close() @@ -183,9 +190,9 @@ class TransactionTest(TestBase): assert connection.in_transaction() transaction.close() assert not connection.in_transaction() - self.assert_(connection.scalar("select count(1) from query_users") == 0) - - result = connection.execute("select * from query_users") + self.assert_(connection.scalar('select count(1) from ' + 'query_users') == 0) + result = connection.execute('select * from query_users') assert len(result.fetchall()) == 0 connection.close() @@ -199,15 +206,15 @@ class TransactionTest(TestBase): trans2.rollback() connection.execute(users.insert(), user_id=3, user_name='user3') transaction.commit() - - eq_( - connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), - [(1,),(3,)] - ) + eq_(connection.execute(select([users.c.user_id]). + order_by(users.c.user_id)).fetchall(), + [(1, ), (3, )]) connection.close() @testing.requires.savepoints - @testing.crashes('oracle+zxjdbc', 'Errors out and causes subsequent tests to deadlock') + @testing.crashes('oracle+zxjdbc', + 'Errors out and causes subsequent tests to ' + 'deadlock') def test_nested_subtransaction_commit(self): connection = testing.db.connect() transaction = connection.begin() @@ -217,11 +224,9 @@ class TransactionTest(TestBase): trans2.commit() connection.execute(users.insert(), user_id=3, user_name='user3') transaction.commit() - - eq_( - connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), - [(1,),(2,),(3,)] - ) + eq_(connection.execute(select([users.c.user_id]). + order_by(users.c.user_id)).fetchall(), + [(1, ), (2, ), (3, )]) connection.close() @testing.requires.savepoints @@ -236,139 +241,115 @@ class TransactionTest(TestBase): trans3.rollback() connection.execute(users.insert(), user_id=4, user_name='user4') transaction.commit() - - eq_( - connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), - [(1,),(4,)] - ) + eq_(connection.execute(select([users.c.user_id]). + order_by(users.c.user_id)).fetchall(), + [(1, ), (4, )]) connection.close() @testing.requires.two_phase_transactions def test_two_phase_transaction(self): connection = testing.db.connect() - transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name='user1') transaction.prepare() transaction.commit() - transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=2, user_name='user2') transaction.commit() transaction.close() - transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=3, user_name='user3') transaction.rollback() - transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=4, user_name='user4') transaction.prepare() transaction.rollback() transaction.close() - - eq_( - connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), - [(1,),(2,)] - ) + eq_(connection.execute(select([users.c.user_id]). + order_by(users.c.user_id)).fetchall(), + [(1, ), (2, )]) connection.close() @testing.requires.two_phase_transactions @testing.requires.savepoints def test_mixed_two_phase_transaction(self): connection = testing.db.connect() - transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name='user1') - transaction2 = connection.begin() connection.execute(users.insert(), user_id=2, user_name='user2') - transaction3 = connection.begin_nested() connection.execute(users.insert(), user_id=3, user_name='user3') - transaction4 = connection.begin() connection.execute(users.insert(), user_id=4, user_name='user4') transaction4.commit() - transaction3.rollback() - connection.execute(users.insert(), user_id=5, user_name='user5') - transaction2.commit() - transaction.prepare() - transaction.commit() - - eq_( - connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), - [(1,),(2,),(5,)] - ) + eq_(connection.execute(select([users.c.user_id]). + order_by(users.c.user_id)).fetchall(), + [(1, ), (2, ), (5, )]) connection.close() @testing.requires.two_phase_transactions - @testing.crashes('mysql+oursql', 'Times out in full test runs only, causing subsequent tests to fail') - @testing.crashes('mysql+zxjdbc', 'Deadlocks, causing subsequent tests to fail') + @testing.crashes('mysql+oursql', + 'Times out in full test runs only, causing ' + 'subsequent tests to fail') + @testing.crashes('mysql+zxjdbc', + 'Deadlocks, causing subsequent tests to fail') @testing.fails_on('mysql', 'FIXME: unknown') def test_two_phase_recover(self): + # MySQL recovery doesn't currently seem to work correctly - # Prepared transactions disappear when connections are closed and even - # when they aren't it doesn't seem possible to use the recovery id. - connection = testing.db.connect() + # Prepared transactions disappear when connections are closed + # and even when they aren't it doesn't seem possible to use the + # recovery id. + connection = testing.db.connect() transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name='user1') transaction.prepare() - connection.close() connection2 = testing.db.connect() - - eq_( - connection2.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), - [] - ) - + eq_(connection2.execute(select([users.c.user_id]). + order_by(users.c.user_id)).fetchall(), + []) recoverables = connection2.recover_twophase() assert transaction.xid in recoverables - connection2.commit_prepared(transaction.xid, recover=True) - - eq_( - connection2.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), - [(1,)] - ) + eq_(connection2.execute(select([users.c.user_id]). + order_by(users.c.user_id)).fetchall(), + [(1, )]) connection2.close() @testing.requires.two_phase_transactions def test_multiple_two_phase(self): conn = testing.db.connect() - xa = conn.begin_twophase() conn.execute(users.insert(), user_id=1, user_name='user1') xa.prepare() xa.commit() - xa = conn.begin_twophase() conn.execute(users.insert(), user_id=2, user_name='user2') xa.prepare() xa.rollback() - xa = conn.begin_twophase() conn.execute(users.insert(), user_id=3, user_name='user3') xa.rollback() - xa = conn.begin_twophase() conn.execute(users.insert(), user_id=4, user_name='user4') xa.prepare() xa.commit() - - result = conn.execute(select([users.c.user_name]).order_by(users.c.user_id)) - eq_(result.fetchall(), [('user1',),('user4',)]) - + result = \ + conn.execute(select([users.c.user_name]). + order_by(users.c.user_id)) + eq_(result.fetchall(), [('user1', ), ('user4', )]) conn.close() class AutoRollbackTest(TestBase): + @classmethod def setup_class(cls): global metadata @@ -379,30 +360,32 @@ class AutoRollbackTest(TestBase): metadata.drop_all(testing.db) def test_rollback_deadlock(self): - """test that returning connections to the pool clears any object locks.""" + """test that returning connections to the pool clears any object + locks.""" + conn1 = testing.db.connect() conn2 = testing.db.connect() - users = Table('deadlock_users', metadata, - Column('user_id', INT, primary_key = True), - Column('user_name', VARCHAR(20)), - test_needs_acid=True, - ) + users = Table('deadlock_users', metadata, Column('user_id', + INT, primary_key=True), Column('user_name', + VARCHAR(20)), test_needs_acid=True) users.create(conn1) - conn1.execute("select * from deadlock_users") + conn1.execute('select * from deadlock_users') conn1.close() - # without auto-rollback in the connection pool's return() logic, this - # deadlocks in PostgreSQL, because conn1 is returned to the pool but - # still has a lock on "deadlock_users". - # comment out the rollback in pool/ConnectionFairy._close() to see ! + # without auto-rollback in the connection pool's return() logic, + # this deadlocks in PostgreSQL, because conn1 is returned to the + # pool but still has a lock on "deadlock_users". comment out the + # rollback in pool/ConnectionFairy._close() to see ! + users.drop(conn2) conn2.close() class ExplicitAutoCommitTest(TestBase): - """test the 'autocommit' flag on select() and text() objects. - Requires PostgreSQL so that we may define a custom function which modifies the database. - """ + """test the 'autocommit' flag on select() and text() objects. + + Requires PostgreSQL so that we may define a custom function which + modifies the database. """ __only_on__ = 'postgresql' @@ -410,112 +393,97 @@ class ExplicitAutoCommitTest(TestBase): def setup_class(cls): global metadata, foo metadata = MetaData(testing.db) - foo = Table('foo', metadata, - Column('id', Integer, primary_key=True), - Column('data', String(100)) - ) + foo = Table('foo', metadata, Column('id', Integer, + primary_key=True), Column('data', String(100))) metadata.create_all() - testing.db.execute("create function insert_foo(varchar) returns integer " - "as 'insert into foo(data) values ($1);select 1;' language sql") + testing.db.execute("create function insert_foo(varchar) " + "returns integer as 'insert into foo(data) " + "values ($1);select 1;' language sql") def teardown(self): foo.delete().execute().close() @classmethod def teardown_class(cls): - testing.db.execute("drop function insert_foo(varchar)") + testing.db.execute('drop function insert_foo(varchar)') metadata.drop_all() def test_control(self): + # test that not using autocommit does not commit + conn1 = testing.db.connect() conn2 = testing.db.connect() - conn1.execute(select([func.insert_foo('data1')])) assert conn2.execute(select([foo.c.data])).fetchall() == [] - conn1.execute(text("select insert_foo('moredata')")) assert conn2.execute(select([foo.c.data])).fetchall() == [] - trans = conn1.begin() trans.commit() - - assert conn2.execute(select([foo.c.data])).fetchall() == [('data1',), ('moredata',)] - + assert conn2.execute(select([foo.c.data])).fetchall() \ + == [('data1', ), ('moredata', )] conn1.close() conn2.close() def test_explicit_compiled(self): conn1 = testing.db.connect() conn2 = testing.db.connect() - - conn1.execute(select([func.insert_foo('data1')]).execution_options(autocommit=True)) - assert conn2.execute(select([foo.c.data])).fetchall() == [('data1',)] - + conn1.execute(select([func.insert_foo('data1' + )]).execution_options(autocommit=True)) + assert conn2.execute(select([foo.c.data])).fetchall() \ + == [('data1', )] conn1.close() conn2.close() def test_explicit_connection(self): conn1 = testing.db.connect() conn2 = testing.db.connect() - conn1.execution_options(autocommit=True).\ - execute(select([func.insert_foo('data1')])) - eq_( - conn2.execute(select([foo.c.data])).fetchall(), - [('data1',), ] - ) + execute(select([func.insert_foo('data1' + )])) + eq_(conn2.execute(select([foo.c.data])).fetchall(), [('data1', + )]) # connection supercedes statement + conn1.execution_options(autocommit=False).\ - execute( - select([func.insert_foo('data2')]). - execution_options(autocommit=True) - ) - eq_( - conn2.execute(select([foo.c.data])).fetchall(), - [('data1',), ] - ) - + execute(select([func.insert_foo('data2' + )]).execution_options(autocommit=True)) + eq_(conn2.execute(select([foo.c.data])).fetchall(), [('data1', + )]) + # ditto - conn1.execution_options(autocommit=True).\ - execute( - select([func.insert_foo('data3')]). - execution_options(autocommit=False) - ) - eq_( - conn2.execute(select([foo.c.data])).fetchall(), - [('data1',), ('data2', ), ('data3',)] - ) + conn1.execution_options(autocommit=True).\ + execute(select([func.insert_foo('data3' + )]).execution_options(autocommit=False)) + eq_(conn2.execute(select([foo.c.data])).fetchall(), [('data1', + ), ('data2', ), ('data3', )]) conn1.close() conn2.close() def test_explicit_text(self): conn1 = testing.db.connect() conn2 = testing.db.connect() - - conn1.execute( - text("select insert_foo('moredata')"). - execution_options(autocommit=True) - ) - assert conn2.execute(select([foo.c.data])).fetchall() == [('moredata',)] - + conn1.execute(text("select insert_foo('moredata')" + ).execution_options(autocommit=True)) + assert conn2.execute(select([foo.c.data])).fetchall() \ + == [('moredata', )] conn1.close() conn2.close() @testing.uses_deprecated(r'autocommit on select\(\) is deprecated', - r'autocommit\(\) is deprecated') + r'autocommit\(\) is deprecated') def test_explicit_compiled_deprecated(self): conn1 = testing.db.connect() conn2 = testing.db.connect() - - conn1.execute(select([func.insert_foo('data1')], autocommit=True)) - assert conn2.execute(select([foo.c.data])).fetchall() == [('data1',)] - + conn1.execute(select([func.insert_foo('data1')], + autocommit=True)) + assert conn2.execute(select([foo.c.data])).fetchall() \ + == [('data1', )] conn1.execute(select([func.insert_foo('data2')]).autocommit()) - assert conn2.execute(select([foo.c.data])).fetchall() == [('data1',), ('data2',)] - + assert conn2.execute(select([foo.c.data])).fetchall() \ + == [('data1', ), ('data2', )] conn1.close() conn2.close() @@ -523,36 +491,38 @@ class ExplicitAutoCommitTest(TestBase): def test_explicit_text_deprecated(self): conn1 = testing.db.connect() conn2 = testing.db.connect() - - conn1.execute(text("select insert_foo('moredata')", autocommit=True)) - assert conn2.execute(select([foo.c.data])).fetchall() == [('moredata',)] - + conn1.execute(text("select insert_foo('moredata')", + autocommit=True)) + assert conn2.execute(select([foo.c.data])).fetchall() \ + == [('moredata', )] conn1.close() conn2.close() def test_implicit_text(self): conn1 = testing.db.connect() conn2 = testing.db.connect() - - conn1.execute(text("insert into foo (data) values ('implicitdata')")) - assert conn2.execute(select([foo.c.data])).fetchall() == [('implicitdata',)] - + conn1.execute(text("insert into foo (data) values " + "('implicitdata')")) + assert conn2.execute(select([foo.c.data])).fetchall() \ + == [('implicitdata', )] conn1.close() conn2.close() tlengine = None + + class TLTransactionTest(TestBase): + @classmethod def setup_class(cls): global users, metadata, tlengine tlengine = create_engine(testing.db.url, strategy='threadlocal') metadata = MetaData() - users = Table('query_users', metadata, - Column('user_id', INT, Sequence('query_users_id_seq', optional=True), primary_key=True), - Column('user_name', VARCHAR(20)), - test_needs_acid=True, - ) + users = Table('query_users', metadata, Column('user_id', INT, + Sequence('query_users_id_seq', optional=True), + primary_key=True), Column('user_name', + VARCHAR(20)), test_needs_acid=True) metadata.create_all(tlengine) def teardown(self): @@ -562,14 +532,16 @@ class TLTransactionTest(TestBase): def teardown_class(cls): metadata.drop_all(tlengine) tlengine.dispose() - + def setup(self): + # ensure tests start with engine closed + tlengine.close() - + def test_connection_close(self): - """test that when connections are closed for real, - transactions are rolled back and disposed.""" + """test that when connections are closed for real, transactions + are rolled back and disposed.""" c = tlengine.contextual_connect() c.begin() @@ -586,14 +558,12 @@ class TLTransactionTest(TestBase): tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.execute(users.insert(), user_id=4, user_name='user4') t2.close() - - result = c.execute("select * from query_users") + result = c.execute('select * from query_users') assert len(result.fetchall()) == 4 - t.close() - external_connection = tlengine.connect() - result = external_connection.execute("select * from query_users") + result = external_connection.execute('select * from query_users' + ) try: assert len(result.fetchall()) == 0 finally: @@ -602,14 +572,15 @@ class TLTransactionTest(TestBase): def test_rollback(self): """test a basic rollback""" + tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.execute(users.insert(), user_id=2, user_name='user2') tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.rollback() - external_connection = tlengine.connect() - result = external_connection.execute("select * from query_users") + result = external_connection.execute('select * from query_users' + ) try: assert len(result.fetchall()) == 0 finally: @@ -617,14 +588,15 @@ class TLTransactionTest(TestBase): def test_commit(self): """test a basic commit""" + tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.execute(users.insert(), user_id=2, user_name='user2') tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.commit() - external_connection = tlengine.connect() - result = external_connection.execute("select * from query_users") + result = external_connection.execute('select * from query_users' + ) try: assert len(result.fetchall()) == 3 finally: @@ -632,38 +604,38 @@ class TLTransactionTest(TestBase): def test_commits(self): connection = tlengine.connect() - assert connection.execute("select count(1) from query_users").scalar() == 0 + assert connection.execute('select count(1) from query_users' + ).scalar() == 0 connection.close() - connection = tlengine.contextual_connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') transaction.commit() - transaction = connection.begin() connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') transaction.commit() - transaction = connection.begin() - result = connection.execute("select * from query_users") + result = connection.execute('select * from query_users') l = result.fetchall() - assert len(l) == 3, "expected 3 got %d" % len(l) + assert len(l) == 3, 'expected 3 got %d' % len(l) transaction.commit() connection.close() def test_rollback_off_conn(self): - # test that a TLTransaction opened off a TLConnection allows that - # TLConnection to be aware of the transactional context + + # test that a TLTransaction opened off a TLConnection allows + # that TLConnection to be aware of the transactional context + conn = tlengine.contextual_connect() trans = conn.begin() conn.execute(users.insert(), user_id=1, user_name='user1') conn.execute(users.insert(), user_id=2, user_name='user2') conn.execute(users.insert(), user_id=3, user_name='user3') trans.rollback() - external_connection = tlengine.connect() - result = external_connection.execute("select * from query_users") + result = external_connection.execute('select * from query_users' + ) try: assert len(result.fetchall()) == 0 finally: @@ -671,8 +643,10 @@ class TLTransactionTest(TestBase): external_connection.close() def test_morerollback_off_conn(self): - # test that an existing TLConnection automatically takes place in a TLTransaction - # opened on a second TLConnection + + # test that an existing TLConnection automatically takes place + # in a TLTransaction opened on a second TLConnection + conn = tlengine.contextual_connect() conn2 = tlengine.contextual_connect() trans = conn2.begin() @@ -680,9 +654,9 @@ class TLTransactionTest(TestBase): conn.execute(users.insert(), user_id=2, user_name='user2') conn.execute(users.insert(), user_id=3, user_name='user3') trans.rollback() - external_connection = tlengine.connect() - result = external_connection.execute("select * from query_users") + result = external_connection.execute('select * from query_users' + ) try: assert len(result.fetchall()) == 0 finally: @@ -697,9 +671,9 @@ class TLTransactionTest(TestBase): conn.execute(users.insert(), user_id=2, user_name='user2') conn.execute(users.insert(), user_id=3, user_name='user3') trans.commit() - external_connection = tlengine.connect() - result = external_connection.execute("select * from query_users") + result = external_connection.execute('select * from query_users' + ) try: assert len(result.fetchall()) == 3 finally: @@ -708,9 +682,10 @@ class TLTransactionTest(TestBase): def test_nesting_rollback(self): """tests nesting of transactions, rollback at the end""" - + external_connection = tlengine.connect() - self.assert_(external_connection.connection is not tlengine.contextual_connect().connection) + self.assert_(external_connection.connection + is not tlengine.contextual_connect().connection) tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.execute(users.insert(), user_id=2, user_name='user2') @@ -721,15 +696,18 @@ class TLTransactionTest(TestBase): tlengine.commit() tlengine.rollback() try: - self.assert_(external_connection.scalar("select count(1) from query_users") == 0) + self.assert_(external_connection.scalar( + 'select count(1) from query_users' + ) == 0) finally: external_connection.close() def test_nesting_commit(self): """tests nesting of transactions, commit at the end.""" - + external_connection = tlengine.connect() - self.assert_(external_connection.connection is not tlengine.contextual_connect().connection) + self.assert_(external_connection.connection + is not tlengine.contextual_connect().connection) tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.execute(users.insert(), user_id=2, user_name='user2') @@ -740,15 +718,19 @@ class TLTransactionTest(TestBase): tlengine.commit() tlengine.commit() try: - self.assert_(external_connection.scalar("select count(1) from query_users") == 5) + self.assert_(external_connection.scalar( + 'select count(1) from query_users' + ) == 5) finally: external_connection.close() def test_mixed_nesting(self): - """tests nesting of transactions off the TLEngine directly inside of - tranasctions off the connection from the TLEngine""" + """tests nesting of transactions off the TLEngine directly + inside of tranasctions off the connection from the TLEngine""" + external_connection = tlengine.connect() - self.assert_(external_connection.connection is not tlengine.contextual_connect().connection) + self.assert_(external_connection.connection + is not tlengine.contextual_connect().connection) conn = tlengine.contextual_connect() trans = conn.begin() trans2 = conn.begin() @@ -768,15 +750,19 @@ class TLTransactionTest(TestBase): trans.rollback() conn.close() try: - self.assert_(external_connection.scalar("select count(1) from query_users") == 0) + self.assert_(external_connection.scalar( + 'select count(1) from query_users' + ) == 0) finally: external_connection.close() def test_more_mixed_nesting(self): - """tests nesting of transactions off the connection from the TLEngine - inside of tranasctions off thbe TLEngine directly.""" + """tests nesting of transactions off the connection from the + TLEngine inside of tranasctions off thbe TLEngine directly.""" + external_connection = tlengine.connect() - self.assert_(external_connection.connection is not tlengine.contextual_connect().connection) + self.assert_(external_connection.connection + is not tlengine.contextual_connect().connection) tlengine.begin() connection = tlengine.contextual_connect() connection.execute(users.insert(), user_id=1, user_name='user1') @@ -791,13 +777,14 @@ class TLTransactionTest(TestBase): tlengine.rollback() connection.close() try: - self.assert_(external_connection.scalar("select count(1) from query_users") == 0) + self.assert_(external_connection.scalar( + 'select count(1) from query_users' + ) == 0) finally: external_connection.close() @testing.requires.savepoints def test_nested_subtransaction_rollback(self): - tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.begin_nested() @@ -806,15 +793,15 @@ class TLTransactionTest(TestBase): tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.commit() tlengine.close() - - eq_( - tlengine.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), - [(1,),(3,)] - ) + eq_(tlengine.execute(select([users.c.user_id]). + order_by(users.c.user_id)).fetchall(), + [(1, ), (3, )]) tlengine.close() @testing.requires.savepoints - @testing.crashes('oracle+zxjdbc', 'Errors out and causes subsequent tests to deadlock') + @testing.crashes('oracle+zxjdbc', + 'Errors out and causes subsequent tests to ' + 'deadlock') def test_nested_subtransaction_commit(self): tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') @@ -823,12 +810,10 @@ class TLTransactionTest(TestBase): tlengine.commit() tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.commit() - tlengine.close() - eq_( - tlengine.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), - [(1,),(2,),(3,)] - ) + eq_(tlengine.execute(select([users.c.user_id]). + order_by(users.c.user_id)).fetchall(), + [(1, ), (2, ), (3, )]) tlengine.close() @testing.requires.savepoints @@ -840,21 +825,18 @@ class TLTransactionTest(TestBase): tlengine.begin() tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.rollback() - tlengine.rollback() - tlengine.execute(users.insert(), user_id=4, user_name='user4') tlengine.commit() tlengine.close() - - eq_( - tlengine.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), - [(1,),(4,)] - ) + eq_(tlengine.execute(select([users.c.user_id]). + order_by(users.c.user_id)).fetchall(), + [(1, ), (4, )]) tlengine.close() def test_connections(self): """tests that contextual_connect is threadlocal""" + c1 = tlengine.contextual_connect() c2 = tlengine.contextual_connect() assert c1.connection is c2.connection @@ -865,7 +847,7 @@ class TLTransactionTest(TestBase): @testing.requires.independent_cursors def test_result_closing(self): """tests that contextual_connect is threadlocal""" - + r1 = tlengine.execute(select([1])) r2 = tlengine.execute(select([1])) row1 = r1.fetchone() @@ -874,62 +856,57 @@ class TLTransactionTest(TestBase): assert r2.connection is r1.connection assert not r2.connection.closed assert not tlengine.closed - - # close again, nothing happens - # since resultproxy calls close() only - # once + + # close again, nothing happens since resultproxy calls close() + # only once + r1.close() assert r2.connection is r1.connection assert not r2.connection.closed assert not tlengine.closed - r2.close() assert r2.connection.closed assert tlengine.closed - + def test_dispose(self): eng = create_engine(testing.db.url, strategy='threadlocal') result = eng.execute(select([1])) eng.dispose() eng.execute(select([1])) - - + @testing.requires.two_phase_transactions def test_two_phase_transaction(self): tlengine.begin_twophase() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.prepare() tlengine.commit() - tlengine.begin_twophase() tlengine.execute(users.insert(), user_id=2, user_name='user2') tlengine.commit() - tlengine.begin_twophase() tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.rollback() - tlengine.begin_twophase() tlengine.execute(users.insert(), user_id=4, user_name='user4') tlengine.prepare() tlengine.rollback() - - eq_( - tlengine.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), - [(1,),(2,)] - ) + eq_(tlengine.execute(select([users.c.user_id]). + order_by(users.c.user_id)).fetchall(), + [(1, ), (2, )]) counters = None + + class ForUpdateTest(TestBase): + @classmethod def setup_class(cls): global counters, metadata metadata = MetaData() counters = Table('forupdate_counters', metadata, - Column('counter_id', INT, primary_key = True), - Column('counter_value', INT), - test_needs_acid=True, - ) + Column('counter_id', INT, primary_key=True), + Column('counter_value', INT), + test_needs_acid=True) counters.create(testing.db) def teardown(self): @@ -939,26 +916,29 @@ class ForUpdateTest(TestBase): def teardown_class(cls): counters.drop(testing.db) - def increment(self, count, errors, update_style=True, delay=0.005): + def increment( + self, + count, + errors, + update_style=True, + delay=0.005, + ): con = testing.db.connect() sel = counters.select(for_update=update_style, - whereclause=counters.c.counter_id==1) - + whereclause=counters.c.counter_id == 1) for i in xrange(count): trans = con.begin() try: existing = con.execute(sel).first() incr = existing['counter_value'] + 1 - time.sleep(delay) - con.execute(counters.update(counters.c.counter_id==1, - values={'counter_value':incr})) + con.execute(counters.update(counters.c.counter_id == 1, + values={'counter_value': incr})) time.sleep(delay) - readback = con.execute(sel).first() - if (readback['counter_value'] != incr): - raise AssertionError("Got %s post-update, expected %s" % - (readback['counter_value'], incr)) + if readback['counter_value'] != incr: + raise AssertionError('Got %s post-update, expected ' + '%s' % (readback['counter_value'], incr)) trans.commit() except Exception, e: trans.rollback() @@ -973,36 +953,40 @@ class ForUpdateTest(TestBase): @testing.requires.independent_connections def test_queued_update(self): """Test SELECT FOR UPDATE with concurrent modifications. - - Runs concurrent modifications on a single row in the users table, - with each mutator trying to increment a value stored in user_name. - + + Runs concurrent modifications on a single row in the users + table, with each mutator trying to increment a value stored in + user_name. + """ + db = testing.db db.execute(counters.insert(), counter_id=1, counter_value=0) - iterations, thread_count = 10, 5 threads, errors = [], [] for i in xrange(thread_count): thrd = threading.Thread(target=self.increment, - args=(iterations,), - kwargs={'errors': errors, - 'update_style': True}) + args=(iterations, ), + kwargs={'errors': errors, + 'update_style': True}) thrd.start() threads.append(thrd) for thrd in threads: thrd.join() - for e in errors: - sys.stdout.write("Failure: %s\n" % e) - + sys.stdout.write('Failure: %s\n' % e) self.assert_(len(errors) == 0) - - sel = counters.select(whereclause=counters.c.counter_id==1) + sel = counters.select(whereclause=counters.c.counter_id == 1) final = db.execute(sel).first() - self.assert_(final['counter_value'] == iterations * thread_count) - - def overlap(self, ids, errors, update_style): + self.assert_(final['counter_value'] == iterations + * thread_count) + + def overlap( + self, + ids, + errors, + update_style, + ): sel = counters.select(for_update=update_style, whereclause=counters.c.counter_id.in_(ids)) con = testing.db.connect() @@ -1016,20 +1000,26 @@ class ForUpdateTest(TestBase): errors.append(e) con.close() - def _threaded_overlap(self, thread_count, groups, update_style=True, pool=5): + def _threaded_overlap( + self, + thread_count, + groups, + update_style=True, + pool=5, + ): db = testing.db for cid in range(pool - 1): - db.execute(counters.insert(), counter_id=cid + 1, counter_value=0) - + db.execute(counters.insert(), counter_id=cid + 1, + counter_value=0) errors, threads = [], [] for i in xrange(thread_count): thrd = threading.Thread(target=self.overlap, - args=(groups.pop(0), errors, update_style)) + args=(groups.pop(0), errors, + update_style)) thrd.start() threads.append(thrd) for thrd in threads: thrd.join() - return errors @testing.crashes('mssql', 'FIXME: unknown') @@ -1040,9 +1030,9 @@ class ForUpdateTest(TestBase): def test_queued_select(self): """Simple SELECT FOR UPDATE conflict test""" - errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)]) + errors = self._threaded_overlap(2, [(1, 2, 3), (3, 4, 5)]) for e in errors: - sys.stderr.write("Failure: %s\n" % e) + sys.stderr.write('Failure: %s\n' % e) self.assert_(len(errors) == 0) @testing.crashes('mssql', 'FIXME: unknown') @@ -1054,8 +1044,7 @@ class ForUpdateTest(TestBase): def test_nowait_select(self): """Simple SELECT FOR UPDATE NOWAIT conflict test""" - errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)], - update_style='nowait') + errors = self._threaded_overlap(2, [(1, 2, 3), (3, 4, 5)], + update_style='nowait') self.assert_(len(errors) != 0) - -- 2.47.2