def add_engine(self, engine):
self.testing_engines[engine] = True
+ def connect(self, dbapi_conn, con_record):
+ self.conns.add(dbapi_conn)
+
def checkout(self, dbapi_con, con_record, con_proxy):
self.proxy_refs[con_proxy] = True
- self.conns.add(dbapi_con)
def _safe(self, fn):
try:
def _stop_test_ctx_minimal(self):
from test.lib import testing
self.close_all()
+
+ self.conns = set()
+
for rec in self.testing_engines.keys():
if rec is not testing.db:
rec.dispose()
self.connections.append(conn)
return conn
+ def _safe(self, fn):
+ try:
+ fn()
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except Exception, e:
+ warnings.warn(
+ "ReconnectFixture couldn't "
+ "close connection: %s" % e)
+
def shutdown(self):
# TODO: this doesn't cover all cases
# as nicely as we'd like, namely MySQLdb.
# proxy server idea to get better
# coverage.
for c in list(self.connections):
- c.close()
+ self._safe(c.close)
self.connections = []
def reconnecting_engine(url=None, options=None):
options = {}
options['module'] = ReconnectFixture(dbapi)
engine = testing_engine(url, options)
+ _dispose = engine.dispose
+ def dispose():
+ engine.dialect.dbapi.shutdown()
+ _dispose()
engine.test_shutdown = engine.dialect.dbapi.shutdown
+ engine.dispose = dispose
return engine
def testing_engine(url=None, options=None):
event.listen(engine, 'after_execute', asserter.execute)
event.listen(engine, 'after_cursor_execute', asserter.cursor_execute)
if use_reaper:
+ event.listen(engine.pool, 'connect', testing_reaper.connect)
event.listen(engine.pool, 'checkout', testing_reaper.checkout)
testing_reaper.add_engine(engine)