from .base import _ConnectionFairy # noqa
from .base import _ConnectionRecord # noqa
from .base import _finalize_fairy # noqa
-from .base import _refs # noqa
from .base import Pool
from .base import reset_commit
from .base import reset_none
lambda ref: _finalize_fairy
and _finalize_fairy(None, rec, pool, ref, echo),
)
- _refs.add(rec)
if echo:
pool.logger.debug(
"Connection %r checked out from pool", dbapi_connection
been garbage collected.
"""
- _refs.discard(connection_record)
if ref is not None:
if connection_record.fairy_ref is not ref:
connection_record.checkin()
-_refs = set()
-
-
class _ConnectionFairy(object):
"""Proxies a DBAPI connection and provides return-on-dereference
if self._connection_record is not None:
rec = self._connection_record
- _refs.remove(rec)
rec.fairy_ref = None
rec.connection = None
# TODO: should this be _return_conn?
from . import assertsql
from . import config
+from . import engines
from . import mock
-from . import util as testutil
from .exclusions import db_spec
from .util import fail
from .. import exc as sa_exc
from .. import orm
-from .. import pool
from .. import schema
from .. import types as sqltypes
from .. import util
_assert_no_stray_pool_connections()
-_STRAY_CONNECTION_FAILURES = 0
-
-
def _assert_no_stray_pool_connections():
- global _STRAY_CONNECTION_FAILURES
-
- # lazy gc on cPython means "do nothing." pool connections
- # shouldn't be in cycles, should go away.
- testutil.lazy_gc()
-
- # however, once in awhile, on an EC2 machine usually,
- # there's a ref in there. usually just one.
- if pool._refs:
-
- # OK, let's be somewhat forgiving.
- _STRAY_CONNECTION_FAILURES += 1
-
- print(
- "Encountered a stray connection in test cleanup: %s"
- % str(pool._refs)
- )
- # then do a real GC sweep. We shouldn't even be here
- # so a single sweep should really be doing it, otherwise
- # there's probably a real unreachable cycle somewhere.
- testutil.gc_collect()
-
- # if we've already had two of these occurrences, or
- # after a hard gc sweep we still have pool._refs?!
- # now we have to raise.
- if pool._refs:
- err = str(pool._refs)
-
- # but clean out the pool refs collection directly,
- # reset the counter,
- # so the error doesn't at least keep happening.
- pool._refs.clear()
- _STRAY_CONNECTION_FAILURES = 0
- warnings.warn(
- "Stray connection refused to leave " "after gc.collect(): %s" % err
- )
- elif _STRAY_CONNECTION_FAILURES > 10:
- assert False, "Encountered more than 10 stray connections"
- _STRAY_CONNECTION_FAILURES = 0
+ engines.testing_reaper.assert_all_closed()
def eq_regex(a, b, msg=None):
import weakref
from . import config
-from . import uses_deprecated
from .util import decorator
from .. import event
from .. import pool
self.testing_engines = weakref.WeakKeyDictionary()
self.conns = set()
+ def add_pool(self, pool):
+ event.listen(pool, "connect", self.connect)
+ event.listen(pool, "checkout", self.checkout)
+ event.listen(pool, "invalidate", self.invalidate)
+
def add_engine(self, engine):
+ self.add_pool(engine.pool)
self.testing_engines[engine] = True
def connect(self, dbapi_conn, con_record):
else:
self._stop_test_ctx_aggressive()
- @uses_deprecated()
def _stop_test_ctx_minimal(self):
self.close_all()
if rec is not config.db:
rec.dispose()
- @uses_deprecated()
def _stop_test_ctx_aggressive(self):
self.close_all()
for conn, rec in list(self.conns):
engine.pool._timeout = 0
engine.pool._max_overflow = 0
if use_reaper:
- event.listen(engine.pool, "connect", testing_reaper.connect)
- event.listen(engine.pool, "checkout", testing_reaper.checkout)
- event.listen(engine.pool, "invalidate", testing_reaper.invalidate)
testing_reaper.add_engine(engine)
return engine
-from sqlalchemy import pool as pool_module
from sqlalchemy.pool import QueuePool
from sqlalchemy.testing import AssertsExecutionResults
from sqlalchemy.testing import fixtures
def close(self):
pass
- def teardown(self):
- # the tests leave some fake connections
- # around which don't necessarily
- # get gc'ed as quickly as we'd like all the time,
- # particularly for non-refcount gc
- pool_module._refs.clear()
-
def setup(self):
# create a throwaway pool which
# has the effect of initializing
def _do_testqueuepool(self, useclose=False):
p = self._queuepool_fixture(pool_size=3, max_overflow=-1)
+ reaper = testing.engines.ConnectionKiller()
+ reaper.add_pool(p)
def status(pool):
return (
lazy_gc()
self.assert_(status(p) == (3, 2, 0, 1))
c1.close()
- lazy_gc()
- assert not pool._refs
+
+ reaper.assert_all_closed()
def test_timeout_accessor(self):
expected_timeout = 123
assert t < 14, "Not all timeouts were < 14 seconds %r" % timeouts
def _test_overflow(self, thread_count, max_overflow):
- gc_collect()
+ reaper = testing.engines.ConnectionKiller()
dbapi = MockDBAPI()
mutex = threading.Lock()
p = pool.QueuePool(
creator=creator, pool_size=3, timeout=2, max_overflow=max_overflow
)
+ reaper.add_pool(p)
peaks = []
def whammy():
self.assert_(max(peaks) <= max_overflow)
- lazy_gc()
- assert not pool._refs
+ reaper.assert_all_closed()
def test_overflow_reset_on_failed_connect(self):
dbapi = Mock()