--- /dev/null
+.. change::
+ :tags: bug, pool, pypy
+ :tickets: 5842
+
+ Fixed issue where connection pool would not return connections to the pool
+ or otherwise be finalized upon garbage collection under pypy if the checked
+ out connection fell out of scope without being closed. This is a long
+ standing issue due to pypy's difference in GC behavior that does not call
+ weakref finalizers if they are relative to another object that is also
+ being garbage collected. A strong reference to the related record is now
+ maintained so that the weakref has a strong-referenced "base" to trigger
+ off of.
Operator Customization
^^^^^^^^^^^^^^^^^^^^^^
+TODO
+
+.. Setup code, not for display
+
+ >>> conn.close()
+ ROLLBACK
\ No newline at end of file
Types Reference: :ref:`types_toplevel`
+
+.. Setup code, not for display
+
+ >>> conn.close()
matching objects locally present in the :class:`_orm.Session`. See the section
:ref:`orm_expression_update_delete` for background on this feature.
+
+.. Setup code, not for display
+
+ >>> conn.close()
+ ROLLBACK
\ No newline at end of file
Relationship Reference: :ref:`relationship_config_toplevel`
Session Reference: :doc:`/orm/session`
+
+
+.. Setup code, not for display
+
+ >>> session.close()
+ ROLLBACK
\ No newline at end of file
from .. import exc
from .. import log
from .. import util
-from ..util import threading
reset_rollback = util.symbol("reset_rollback")
self._orig_logging_name = None
log.instance_logger(self, echoflag=echo)
- self._threadconns = threading.local()
self._creator = creator
self._recycle = recycle
self._invalidate_time = 0
dbapi_connection = rec.get_connection()
except Exception as err:
with util.safe_reraise():
- rec._checkin_failed(err)
+ rec._checkin_failed(err, _fairy_was_created=False)
echo = pool._should_log_debug()
fairy = _ConnectionFairy(dbapi_connection, rec, echo)
- rec.fairy_ref = weakref.ref(
+ rec.fairy_ref = ref = weakref.ref(
fairy,
lambda ref: _finalize_fairy
and _finalize_fairy(None, rec, pool, ref, echo),
)
+ _strong_ref_connection_records[ref] = rec
if echo:
pool.logger.debug(
"Connection %r checked out from pool", dbapi_connection
)
return fairy
- def _checkin_failed(self, err):
+ def _checkin_failed(self, err, _fairy_was_created=True):
self.invalidate(e=err)
- self.checkin(_no_fairy_ref=True)
+ self.checkin(
+ _fairy_was_created=_fairy_was_created,
+ )
- def checkin(self, _no_fairy_ref=False):
- if self.fairy_ref is None and not _no_fairy_ref:
+ def checkin(self, _fairy_was_created=True):
+ if self.fairy_ref is None and _fairy_was_created:
+ # _fairy_was_created is False for the initial get connection phase;
+ # meaning there was no _ConnectionFairy and we must unconditionally
+ # do a checkin.
+ #
+ # otherwise, if fairy_was_created==True, if fairy_ref is None here
+ # that means we were checked in already, so this looks like
+ # a double checkin.
util.warn("Double checkin attempted on %s" % self)
return
self.fairy_ref = None
finalizer(connection)
if pool.dispatch.checkin:
pool.dispatch.checkin(connection, self)
+
pool._return_conn(self)
@property
"""
+ if ref:
+ _strong_ref_connection_records.pop(ref, None)
+ elif fairy:
+ _strong_ref_connection_records.pop(weakref.ref(fairy), None)
+
if ref is not None:
if connection_record.fairy_ref is not ref:
return
connection_record.checkin()
+# a dictionary of the _ConnectionFairy weakrefs to _ConnectionRecord, so that
+# GC under pypy will call ConnectionFairy finalizers. linked directly to the
+# weakref that will empty itself when collected so that it should not create
+# any unmanaged memory references.
+_strong_ref_connection_records = {}
+
+
class _ConnectionFairy(object):
"""Proxies a DBAPI connection and provides return-on-dereference
)
except Exception as err:
with util.safe_reraise():
- fairy._connection_record._checkin_failed(err)
+ fairy._connection_record._checkin_failed(
+ err,
+ _fairy_was_created=True,
+ )
+
+ # prevent _ConnectionFairy from being carried
+ # in the stack trace. Do this after the
+ # connection record has been checked in, so that
+ # if the del triggers a finalize fairy, it won't
+ # try to checkin a second time.
+ del fairy
attempts -= 1
is_(a, None, msg=msg)
+def is_not_none(a, msg=None):
+ is_not(a, None, msg=msg)
+
+
def is_true(a, msg=None):
is_(bool(a), True, msg=msg)
is_(bool(a), False, msg=msg)
-def is_none(a, msg=None):
- is_(a, None, msg=msg)
-
-
-def is_not_none(a, msg=None):
- is_not(a, None, msg=msg)
-
-
def is_(a, b, msg=None):
"""Assert a is b, with repr messaging on failure."""
assert a is b, msg or "%r is not %r" % (a, b)
from . import config
from .util import decorator
+from .util import gc_collect
from .. import event
from .. import pool
self._drop_testing_engines("function")
self._drop_testing_engines("class")
+ def stop_test_class_outside_fixtures(self):
+ # ensure no refs to checked out connections at all.
+
+ if pool.base._strong_ref_connection_records:
+ gc_collect()
+
+ if pool.base._strong_ref_connection_records:
+ ln = len(pool.base._strong_ref_connection_records)
+ pool.base._strong_ref_connection_records.clear()
+ assert (
+ False
+ ), "%d connection recs not cleared after test suite" % (ln)
+
def final_cleanup(self):
self.checkin_all()
for scope in self.testing_engines:
def stop_test_class_outside_fixtures(cls):
+ engines.testing_reaper.stop_test_class_outside_fixtures()
provision.stop_test_class_outside_fixtures(config, config.db, cls)
try:
if not options.low_connections:
d2.pop(k, None)
for k in (
- "_threadconns",
"_invoke_creator",
"_pool",
"_overflow_lock",
assert canary.call_args_list[0][0][2] is exc
@testing.combinations((True, testing.requires.python3), (False,))
- @testing.requires.predictable_gc
def test_checkin_event_gc(self, detach_gced):
p, canary = self._checkin_event_fixture()
def _do_testqueuepool(self, useclose=False):
p = self._queuepool_fixture(pool_size=3, max_overflow=-1)
- reaper = testing.engines.ConnectionKiller()
- reaper.add_pool(p)
def status(pool):
return (
else:
c4 = c3 = c2 = None
lazy_gc()
- self.assert_(status(p) == (3, 3, 3, 3))
+ eq_(status(p), (3, 3, 3, 3))
if useclose:
c1.close()
c5.close()
self.assert_(status(p) == (3, 2, 0, 1))
c1.close()
- reaper.assert_all_closed()
-
def test_timeout_accessor(self):
expected_timeout = 123
p = self._queuepool_fixture(timeout=expected_timeout)
dbapi.shutdown(True)
assert_raises_context_ok(Exception, p.connect)
eq_(p._overflow, 0)
+
eq_(p.checkedout(), 0) # and not 1
dbapi.shutdown(False)
self._assert_cleanup_on_pooled_reconnect(dbapi, p)
@testing.combinations((True, testing.requires.python3), (False,))
- @testing.requires.predictable_gc
def test_userspace_disconnectionerror_weakref_finalizer(self, detach_gced):
dbapi, pool = self._queuepool_dbapi_fixture(
pool_size=1, max_overflow=2
def test_pre_ping_db_stays_shutdown(self):
engine = engines.reconnecting_engine(options={"pool_pre_ping": True})
+ if isinstance(engine.pool, pool.QueuePool):
+ eq_(engine.pool.checkedin(), 0)
+ eq_(engine.pool._overflow, -5)
+
conn = engine.connect()
eq_(conn.execute(select(1)).scalar(), 1)
conn.close()
+ if isinstance(engine.pool, pool.QueuePool):
+ eq_(engine.pool.checkedin(), 1)
+ eq_(engine.pool._overflow, -4)
+
engine.test_shutdown(stop=True)
assert_raises(exc.DBAPIError, engine.connect)
+ if isinstance(engine.pool, pool.QueuePool):
+ eq_(engine.pool.checkedin(), 1)
+ eq_(engine.pool._overflow, -4)
+
class InvalidateDuringResultTest(fixtures.TestBase):
__backend__ = True