]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Create explicit GC ordering between ConnectionFairy/ConnectionRecord
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 14 Jan 2021 23:07:14 +0000 (18:07 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 15 Jan 2021 05:29:12 +0000 (00:29 -0500)
Fixed issue where connection pool would not return connections to the pool
or otherwise be finalized upon garbage collection under pypy if the checked
out connection fell out of scope without being closed.   This is a long
standing issue due to pypy's difference in GC behavior that does not call
weakref finalizers if they are relative to another object that is also
being garbage collected.  A strong reference to the related record is now
maintained so that the weakref has a strong-referenced "base" to trigger
off of.

Fixes: #5842
Change-Id: Id5448fdacb6cceaac1ea40b2fbc851f052ed8e86

doc/build/changelog/unreleased_14/pypy_gc_pool.rst [new file with mode: 0644]
doc/build/core/operators.rst
doc/build/core/tutorial.rst
doc/build/orm/queryguide.rst
doc/build/orm/tutorial.rst
lib/sqlalchemy/pool/base.py
lib/sqlalchemy/testing/assertions.py
lib/sqlalchemy/testing/engines.py
lib/sqlalchemy/testing/plugin/plugin_base.py
test/engine/test_pool.py
test/engine/test_reconnect.py

diff --git a/doc/build/changelog/unreleased_14/pypy_gc_pool.rst b/doc/build/changelog/unreleased_14/pypy_gc_pool.rst
new file mode 100644 (file)
index 0000000..d1fca40
--- /dev/null
@@ -0,0 +1,12 @@
+.. change::
+    :tags: bug, pool, pypy
+    :tickets: 5842
+
+    Fixed issue where connection pool would not return connections to the pool
+    or otherwise be finalized upon garbage collection under pypy if the checked
+    out connection fell out of scope without being closed.   This is a long
+    standing issue due to pypy's difference in GC behavior that does not call
+    weakref finalizers if they are relative to another object that is also
+    being garbage collected.  A strong reference to the related record is now
+    maintained so that the weakref has a strong-referenced "base" to trigger
+    off of.
index ef821c8d0474e1fe2e20838c8e49166b68d5befe..36ed5ecf6fdaf7d99738e6472887b93454d00ced 100644 (file)
@@ -659,4 +659,10 @@ The above conjunction functions :func:`_sql.and_`, :func:`_sql.or_`,
 Operator Customization
 ^^^^^^^^^^^^^^^^^^^^^^
 
+TODO
 
+
+..  Setup code, not for display
+
+    >>> conn.close()
+    ROLLBACK
\ No newline at end of file
index e927a77ceaa78556b632fa573db3aced216e17d4..14782742e064ed4c8f827eaaa049c85d6f7933e6 100644 (file)
@@ -2599,3 +2599,7 @@ Connection Reference: :ref:`connections_toplevel`
 Types Reference: :ref:`types_toplevel`
 
 
+
+..  Setup code, not for display
+
+    >>> conn.close()
index faa1efdef685744e24a5272471fdacdd3ca542d1..5678d7cc73fefc08d504f17d6de5554c9689de96 100644 (file)
@@ -898,3 +898,8 @@ any number of database rows while also being able to synchronize the state of
 matching objects locally present in the :class:`_orm.Session`. See the section
 :ref:`orm_expression_update_delete` for background on this feature.
 
+
+..  Setup code, not for display
+
+    >>> conn.close()
+    ROLLBACK
\ No newline at end of file
index ae62d256a155b441afd5ec7284624e95ed855337..59cc9cfab2a680ed7c123907601ec17e9381f9ec 100644 (file)
@@ -2243,3 +2243,9 @@ Mapper Reference: :ref:`mapper_config_toplevel`
 Relationship Reference: :ref:`relationship_config_toplevel`
 
 Session Reference: :doc:`/orm/session`
+
+
+..  Setup code, not for display
+
+    >>> session.close()
+    ROLLBACK
\ No newline at end of file
index 6c3aad037fe82c99d81149781b0ec9906da302b0..47d9e2cbad7728b2ee47c08735d49fe4164cf3e0 100644 (file)
@@ -18,7 +18,6 @@ from .. import event
 from .. import exc
 from .. import log
 from .. import util
-from ..util import threading
 
 
 reset_rollback = util.symbol("reset_rollback")
@@ -172,7 +171,6 @@ class Pool(log.Identified):
             self._orig_logging_name = None
 
         log.instance_logger(self, echoflag=echo)
-        self._threadconns = threading.local()
         self._creator = creator
         self._recycle = recycle
         self._invalidate_time = 0
@@ -423,27 +421,37 @@ class _ConnectionRecord(object):
             dbapi_connection = rec.get_connection()
         except Exception as err:
             with util.safe_reraise():
-                rec._checkin_failed(err)
+                rec._checkin_failed(err, _fairy_was_created=False)
         echo = pool._should_log_debug()
         fairy = _ConnectionFairy(dbapi_connection, rec, echo)
 
-        rec.fairy_ref = weakref.ref(
+        rec.fairy_ref = ref = weakref.ref(
             fairy,
             lambda ref: _finalize_fairy
             and _finalize_fairy(None, rec, pool, ref, echo),
         )
+        _strong_ref_connection_records[ref] = rec
         if echo:
             pool.logger.debug(
                 "Connection %r checked out from pool", dbapi_connection
             )
         return fairy
 
-    def _checkin_failed(self, err):
+    def _checkin_failed(self, err, _fairy_was_created=True):
         self.invalidate(e=err)
-        self.checkin(_no_fairy_ref=True)
+        self.checkin(
+            _fairy_was_created=_fairy_was_created,
+        )
 
-    def checkin(self, _no_fairy_ref=False):
-        if self.fairy_ref is None and not _no_fairy_ref:
+    def checkin(self, _fairy_was_created=True):
+        if self.fairy_ref is None and _fairy_was_created:
+            # _fairy_was_created is False for the initial get connection phase;
+            # meaning there was no _ConnectionFairy and we must unconditionally
+            # do a checkin.
+            #
+            # otherwise, if fairy_was_created==True, if fairy_ref is None here
+            # that means we were checked in already, so this looks like
+            # a double checkin.
             util.warn("Double checkin attempted on %s" % self)
             return
         self.fairy_ref = None
@@ -454,6 +462,7 @@ class _ConnectionRecord(object):
             finalizer(connection)
         if pool.dispatch.checkin:
             pool.dispatch.checkin(connection, self)
+
         pool._return_conn(self)
 
     @property
@@ -604,6 +613,11 @@ def _finalize_fairy(
 
     """
 
+    if ref:
+        _strong_ref_connection_records.pop(ref, None)
+    elif fairy:
+        _strong_ref_connection_records.pop(weakref.ref(fairy), None)
+
     if ref is not None:
         if connection_record.fairy_ref is not ref:
             return
@@ -663,6 +677,13 @@ def _finalize_fairy(
         connection_record.checkin()
 
 
+# a dictionary of the _ConnectionFairy weakrefs to _ConnectionRecord, so that
+# GC under pypy will call ConnectionFairy finalizers.  linked directly to the
+# weakref that will empty itself when collected so that it should not create
+# any unmanaged memory references.
+_strong_ref_connection_records = {}
+
+
 class _ConnectionFairy(object):
 
     """Proxies a DBAPI connection and provides return-on-dereference
@@ -797,7 +818,17 @@ class _ConnectionFairy(object):
                     )
                 except Exception as err:
                     with util.safe_reraise():
-                        fairy._connection_record._checkin_failed(err)
+                        fairy._connection_record._checkin_failed(
+                            err,
+                            _fairy_was_created=True,
+                        )
+
+                        # prevent _ConnectionFairy from being carried
+                        # in the stack trace.  Do this after the
+                        # connection record has been checked in, so that
+                        # if the del triggers a finalize fairy, it won't
+                        # try to checkin a second time.
+                        del fairy
 
                 attempts -= 1
 
index 40549f54c5ddcceec49c71dbe4b5727a69dcef63..f2ed91b79376daf8d8bc78acbf90b03aa306b79f 100644 (file)
@@ -228,6 +228,10 @@ def is_none(a, msg=None):
     is_(a, None, msg=msg)
 
 
+def is_not_none(a, msg=None):
+    is_not(a, None, msg=msg)
+
+
 def is_true(a, msg=None):
     is_(bool(a), True, msg=msg)
 
@@ -236,14 +240,6 @@ def is_false(a, msg=None):
     is_(bool(a), False, msg=msg)
 
 
-def is_none(a, msg=None):
-    is_(a, None, msg=msg)
-
-
-def is_not_none(a, msg=None):
-    is_not(a, None, msg=msg)
-
-
 def is_(a, b, msg=None):
     """Assert a is b, with repr messaging on failure."""
     assert a is b, msg or "%r is not %r" % (a, b)
index 8b334fde20e635a59d314b369750471351f5ef63..a313c298a522eac1f4104b9332d209229d4c16e4 100644 (file)
@@ -14,6 +14,7 @@ import weakref
 
 from . import config
 from .util import decorator
+from .util import gc_collect
 from .. import event
 from .. import pool
 
@@ -124,6 +125,19 @@ class ConnectionKiller(object):
         self._drop_testing_engines("function")
         self._drop_testing_engines("class")
 
+    def stop_test_class_outside_fixtures(self):
+        # ensure no refs to checked out connections at all.
+
+        if pool.base._strong_ref_connection_records:
+            gc_collect()
+
+            if pool.base._strong_ref_connection_records:
+                ln = len(pool.base._strong_ref_connection_records)
+                pool.base._strong_ref_connection_records.clear()
+                assert (
+                    False
+                ), "%d connection recs not cleared after test suite" % (ln)
+
     def final_cleanup(self):
         self.checkin_all()
         for scope in self.testing_engines:
index 7851fbb3ece67429cb8d444fdcf799c6f50083da..858814f914fd1c84cf2744d0cdfe8c309ff32282 100644 (file)
@@ -586,6 +586,7 @@ def stop_test_class(cls):
 
 
 def stop_test_class_outside_fixtures(cls):
+    engines.testing_reaper.stop_test_class_outside_fixtures()
     provision.stop_test_class_outside_fixtures(config, config.db, cls)
     try:
         if not options.low_connections:
index decdce3f9bec5f1afb8b45b284259059f0a596a3..b43a29fae84dda841ea385df343ce98baaa0bcac 100644 (file)
@@ -259,7 +259,6 @@ class PoolTest(PoolTestBase):
             d2.pop(k, None)
 
         for k in (
-            "_threadconns",
             "_invoke_creator",
             "_pool",
             "_overflow_lock",
@@ -639,7 +638,6 @@ class PoolEventsTest(PoolTestBase):
         assert canary.call_args_list[0][0][2] is exc
 
     @testing.combinations((True, testing.requires.python3), (False,))
-    @testing.requires.predictable_gc
     def test_checkin_event_gc(self, detach_gced):
         p, canary = self._checkin_event_fixture()
 
@@ -848,8 +846,6 @@ class QueuePoolTest(PoolTestBase):
 
     def _do_testqueuepool(self, useclose=False):
         p = self._queuepool_fixture(pool_size=3, max_overflow=-1)
-        reaper = testing.engines.ConnectionKiller()
-        reaper.add_pool(p)
 
         def status(pool):
             return (
@@ -878,7 +874,7 @@ class QueuePoolTest(PoolTestBase):
         else:
             c4 = c3 = c2 = None
             lazy_gc()
-        self.assert_(status(p) == (3, 3, 3, 3))
+        eq_(status(p), (3, 3, 3, 3))
         if useclose:
             c1.close()
             c5.close()
@@ -898,8 +894,6 @@ class QueuePoolTest(PoolTestBase):
         self.assert_(status(p) == (3, 2, 0, 1))
         c1.close()
 
-        reaper.assert_all_closed()
-
     def test_timeout_accessor(self):
         expected_timeout = 123
         p = self._queuepool_fixture(timeout=expected_timeout)
@@ -1391,6 +1385,7 @@ class QueuePoolTest(PoolTestBase):
             dbapi.shutdown(True)
             assert_raises_context_ok(Exception, p.connect)
             eq_(p._overflow, 0)
+
             eq_(p.checkedout(), 0)  # and not 1
 
             dbapi.shutdown(False)
@@ -1520,7 +1515,6 @@ class QueuePoolTest(PoolTestBase):
         self._assert_cleanup_on_pooled_reconnect(dbapi, p)
 
     @testing.combinations((True, testing.requires.python3), (False,))
-    @testing.requires.predictable_gc
     def test_userspace_disconnectionerror_weakref_finalizer(self, detach_gced):
         dbapi, pool = self._queuepool_dbapi_fixture(
             pool_size=1, max_overflow=2
index 7a64b25508b6e72c5c85147a0894aad510dba498..19b9a84a495c8b74737eacc12afaaac2d3a55c64 100644 (file)
@@ -1324,14 +1324,26 @@ class PrePingRealTest(fixtures.TestBase):
     def test_pre_ping_db_stays_shutdown(self):
         engine = engines.reconnecting_engine(options={"pool_pre_ping": True})
 
+        if isinstance(engine.pool, pool.QueuePool):
+            eq_(engine.pool.checkedin(), 0)
+            eq_(engine.pool._overflow, -5)
+
         conn = engine.connect()
         eq_(conn.execute(select(1)).scalar(), 1)
         conn.close()
 
+        if isinstance(engine.pool, pool.QueuePool):
+            eq_(engine.pool.checkedin(), 1)
+            eq_(engine.pool._overflow, -4)
+
         engine.test_shutdown(stop=True)
 
         assert_raises(exc.DBAPIError, engine.connect)
 
+        if isinstance(engine.pool, pool.QueuePool):
+            eq_(engine.pool.checkedin(), 1)
+            eq_(engine.pool._overflow, -4)
+
 
 class InvalidateDuringResultTest(fixtures.TestBase):
     __backend__ = True