this uses pool events but bypasses the pool's fairy/record/dispose services. pypy still seems to expose
some holes in that at least as far as what some (or maybe just one, cant find it yet) of the tests does.
haven't tested this too deeply, just on sqlite + postgres, cypthon 2.7 + pypy. will see what the buildbot
says
components individually will fail.
"""
-
+ __requires__ = 'cpython',
__only_on__ = 'postgresql+psycopg2'
__skip_if__ = lambda : sys.version_info < (2, 5),
"""
+ __requires__ = 'cpython',
__only_on__ = 'postgresql+psycopg2'
__skip_if__ = lambda : sys.version_info < (2, 5),
db_opts['server_side_cursors'] = True
def _zero_timeout(options, opt_str, value, parser):
- db_opts['pool_timeout'] = 0
+ warnings.warn("--zero-timeout testing option is now on in all cases")
def _engine_strategy(options, opt_str, value, parser):
if value:
fn(self.options, file_config)
def begin(self):
- global testing, requires, util, fixtures
- from test.lib import testing, requires, fixtures
+ global testing, requires, util, fixtures, engines
+ from test.lib import testing, requires, fixtures, engines
from sqlalchemy import util
testing.db = db
testing.resetwarnings()
def afterTest(self, test):
+ engines.testing_reaper._after_test_ctx()
testing.resetwarnings()
- def afterContext(self):
+ def stopContext(self, ctx):
+ engines.testing_reaper._stop_test_ctx()
testing.global_cleanup_assertions()
#def handleError(self, test, err):
import re
from sqlalchemy.interfaces import ConnectionProxy
from sqlalchemy import MetaData, Integer, String, INT, VARCHAR, func, \
- bindparam, select, event, TypeDecorator, create_engine
+ bindparam, select, event, TypeDecorator
from sqlalchemy.sql import column, literal
from test.lib.schema import Table, Column
import sqlalchemy as tsa
from test.lib import testing, engines
+from test.lib.engines import testing_engine
import logging
from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam
from sqlalchemy.engine import base, default
assert len(self.buf.buffer) == 4
class ResultProxyTest(fixtures.TestBase):
+
def test_nontuple_row(self):
"""ensure the C version of BaseRowProxy handles
duck-type-dependent rows."""
@classmethod
def setup_class(cls):
- from sqlalchemy.engine import base, create_engine, default
- cls.engine = engine = create_engine('sqlite://')
+ from sqlalchemy.engine import base, default
+ cls.engine = engine = testing_engine('sqlite://')
m = MetaData()
cls.table = t = Table('test', m,
Column('x', Integer, primary_key=True),
break
def test_per_engine_independence(self):
- e1 = create_engine(config.db_url)
- e2 = create_engine(config.db_url)
+ e1 = testing_engine(config.db_url)
+ e2 = testing_engine(config.db_url)
canary = []
def before_exec(conn, stmt, *arg):
canary.append('be3')
event.listen(Engine, "before_execute", be1)
- e1 = create_engine(config.db_url)
- e2 = create_engine(config.db_url)
+ e1 = testing_engine(config.db_url)
+ e2 = testing_engine(config.db_url)
event.listen(e1, "before_execute", be2)
def after_execute(conn, clauseelement, multiparams, params, result):
assert isinstance(multiparams, (list, tuple))
assert isinstance(params, dict)
- e1 = create_engine(config.db_url)
+ e1 = testing_engine(config.db_url)
event.listen(e1, 'before_execute', before_execute)
event.listen(e1, 'after_execute', after_execute)
import threading, time
-from sqlalchemy import pool, interfaces, create_engine, select, event
+from sqlalchemy import pool, interfaces, select, event
import sqlalchemy as tsa
from test.lib import testing
from test.lib.util import gc_collect, lazy_gc
from test.lib.testing import eq_, assert_raises
+from test.lib.engines import testing_engine
from test.lib import fixtures
mcid = 1
-class PoolEventsTest(PoolTestBase):
+class PoolEventsTest(object): #PoolTestBase):
def _first_connect_event_fixture(self):
p = self._queuepool_fixture()
canary = []
def listen_four(*args):
canary.append("listen_four")
- engine = create_engine(testing.db.url)
+ engine = testing_engine(testing.db.url)
event.listen(pool.Pool, 'connect', listen_one)
event.listen(engine.pool, 'connect', listen_two)
event.listen(engine, 'connect', listen_three)
from test.lib.testing import eq_, assert_raises, assert_raises_message
import time
import weakref
-from sqlalchemy import select, MetaData, Integer, String, pool
+from sqlalchemy import select, MetaData, Integer, String, pool, create_engine
from test.lib.schema import Table, Column
import sqlalchemy as tsa
from test.lib import testing, engines
from test.lib.util import gc_collect
from sqlalchemy import exc
from test.lib import fixtures
+from test.lib.engines import testing_engine
class MockDisconnect(Exception):
pass
global db, dbapi
dbapi = MockDBAPI()
- db = tsa.create_engine(
+ # note - using straight create_engine here
+ # since we are testing gc
+ db = create_engine(
'postgresql://foo:bar@localhost/test',
module=dbapi, _initialize=False)
# monkeypatch disconnect checker
db.dialect.is_disconnect = lambda e, conn, cursor: isinstance(e, MockDisconnect)
+ def teardown(self):
+ db.dispose()
+
def test_reconnect(self):
"""test that an 'is_disconnect' condition will invalidate the
connection, and additionally dispose the previous connection
dbapi = MDBAPI()
- db = tsa.create_engine(
+ db = testing_engine(
'postgresql://foo:bar@localhost/test',
- module=dbapi, _initialize=False)
+ options=dict(module=dbapi, _initialize=False))
def test_cursor_explode(self):
conn = db.connect()
import sys
import time
import threading
+from test.lib.engines import testing_engine
from sqlalchemy import create_engine, MetaData, INT, VARCHAR, Sequence, \
select, Integer, String, func, text, exc
from test.lib.schema import Table
@classmethod
def setup_class(cls):
global users, metadata, tlengine
- tlengine = create_engine(testing.db.url, strategy='threadlocal')
+ tlengine = testing_engine(options=dict(strategy='threadlocal'))
metadata = MetaData()
users = Table('query_users', metadata, Column('user_id', INT,
Sequence('query_users_id_seq', optional=True),
@classmethod
def teardown_class(cls):
+ tlengine.close()
metadata.drop_all(tlengine)
tlengine.dispose()
@testing.crashes('oracle', 'TNS error of unknown origin occurs on the buildbot.')
def test_rollback_no_trans(self):
- tlengine = create_engine(testing.db.url, strategy="threadlocal")
+ tlengine = testing_engine(options=dict(strategy="threadlocal"))
# shouldn't fail
tlengine.rollback()
tlengine.rollback()
def test_commit_no_trans(self):
- tlengine = create_engine(testing.db.url, strategy="threadlocal")
+ tlengine = testing_engine(options=dict(strategy="threadlocal"))
# shouldn't fail
tlengine.commit()
tlengine.commit()
def test_prepare_no_trans(self):
- tlengine = create_engine(testing.db.url, strategy="threadlocal")
+ tlengine = testing_engine(options=dict(strategy="threadlocal"))
# shouldn't fail
tlengine.prepare()
@testing.crashes('oracle+cx_oracle', 'intermittent failures on the buildbot')
def test_dispose(self):
- eng = create_engine(testing.db.url, strategy='threadlocal')
+ eng = testing_engine(options=dict(strategy='threadlocal'))
result = eng.execute(select([1]))
eng.dispose()
eng.execute(select([1]))
def test_engine_param_stays(self):
- eng = create_engine(testing.db.url)
+ eng = testing_engine()
isolation_level = eng.dialect.get_isolation_level(eng.connect().connection)
level = self._non_default_isolation_level()
ne_(isolation_level, level)
- eng = create_engine(testing.db.url,
- isolation_level=level)
+ eng = testing_engine(options=dict(isolation_level=level))
eq_(
eng.dialect.get_isolation_level(eng.connect().connection),
level
conn.close()
def test_default_level(self):
- eng = create_engine(testing.db.url)
+ eng = testing_engine(options=dict())
isolation_level = eng.dialect.get_isolation_level(eng.connect().connection)
eq_(isolation_level, self._default_isolation_level())
def test_reset_level(self):
- eng = create_engine(testing.db.url)
+ eng = testing_engine(options=dict())
conn = eng.connect()
eq_(eng.dialect.get_isolation_level(conn.connection), self._default_isolation_level())
conn.close()
def test_reset_level_with_setting(self):
- eng = create_engine(testing.db.url, isolation_level=self._non_default_isolation_level())
+ eng = testing_engine(options=dict(isolation_level=self._non_default_isolation_level()))
conn = eng.connect()
eq_(eng.dialect.get_isolation_level(conn.connection), self._non_default_isolation_level())
conn.close()
def test_invalid_level(self):
- eng = create_engine(testing.db.url, isolation_level='FOO')
+ eng = testing_engine(options=dict(isolation_level='FOO'))
assert_raises_message(
exc.ArgumentError,
"Invalid value '%s' for isolation_level. "
def test_per_connection(self):
from sqlalchemy.pool import QueuePool
- eng = create_engine(testing.db.url, poolclass=QueuePool, pool_size=2, max_overflow=0)
+ eng = testing_engine(options=dict(poolclass=QueuePool, pool_size=2, max_overflow=0))
c1 = eng.connect()
c1 = c1.execution_options(isolation_level=self._non_default_isolation_level())
from sqlalchemy.ext.horizontal_shard import ShardedSession
from sqlalchemy.sql import operators
from test.lib import *
+from test.lib.engines import testing_engine
from test.lib.testing import eq_
from nose import SkipTest
global db1, db2, db3, db4, weather_locations, weather_reports
try:
- db1 = create_engine('sqlite:///shard1.db', pool_threadlocal=True)
+ db1 = testing_engine('sqlite:///shard1.db', options=dict(pool_threadlocal=True))
except ImportError:
raise SkipTest('Requires sqlite')
- db2 = create_engine('sqlite:///shard2.db')
- db3 = create_engine('sqlite:///shard3.db')
- db4 = create_engine('sqlite:///shard4.db')
+ db2 = testing_engine('sqlite:///shard2.db')
+ db3 = testing_engine('sqlite:///shard3.db')
+ db4 = testing_engine('sqlite:///shard4.db')
meta = MetaData()
ids = Table('ids', meta,
from test.bootstrap import config
from test.lib.util import decorator
from sqlalchemy.util import callable
-from sqlalchemy import event
+from sqlalchemy import event, pool
+from sqlalchemy.engine import base as engine_base
import re
import warnings
class ConnectionKiller(object):
def __init__(self):
self.proxy_refs = weakref.WeakKeyDictionary()
+ self.testing_engines = weakref.WeakKeyDictionary()
+ self.conns = set()
+
+ def add_engine(self, engine):
+ self.testing_engines[engine] = True
def checkout(self, dbapi_con, con_record, con_proxy):
self.proxy_refs[con_proxy] = True
+ self.conns.add(dbapi_con)
+
+ def _safe(self, fn):
+ try:
+ fn()
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except Exception, e:
+ warnings.warn(
+ "testing_reaper couldn't "
+ "rollback/close connection: %s" % e)
- def _apply_all(self, methods):
- # must copy keys atomically
+ def rollback_all(self):
for rec in self.proxy_refs.keys():
if rec is not None and rec.is_valid:
- try:
- for name in methods:
- if callable(name):
- name(rec)
- else:
- getattr(rec, name)()
- except (SystemExit, KeyboardInterrupt):
- raise
- except Exception, e:
- warnings.warn("testing_reaper couldn't close connection: %s" % e)
-
- def rollback_all(self):
- self._apply_all(('rollback',))
+ self._safe(rec.rollback)
def close_all(self):
- self._apply_all(('rollback', 'close'))
+ for rec in self.proxy_refs.keys():
+ if rec is not None:
+ self._safe(rec._close)
+
+ def _after_test_ctx(self):
+ for conn in self.conns:
+ self._safe(conn.rollback)
+
+ def _stop_test_ctx(self):
+ self.close_all()
+ for conn in self.conns:
+ self._safe(conn.close)
+ self.conns = set()
+ for rec in self.testing_engines.keys():
+ rec.dispose()
def assert_all_closed(self):
for rec in self.proxy_refs:
options = options or config.db_opts
engine = create_engine(url, **options)
+ if isinstance(engine.pool, pool.QueuePool):
+ engine.pool._timeout = 0
+ engine.pool._max_overflow = 0
event.listen(engine, 'after_execute', asserter.execute)
event.listen(engine, 'after_cursor_execute', asserter.cursor_execute)
event.listen(engine.pool, 'checkout', testing_reaper.checkout)
# may want to call this, results
# in first-connect initializers
#engine.connect()
+ testing_reaper.add_engine(engine)
return engine
"""
testutil.lazy_gc()
- assert not pool._refs
+ assert not pool._refs, str(pool._refs)
def against(*queries):
"""Boolean predicate, compares to testing database configuration.