@testing.variation("is_asyncio", [(True, testing.requires.asyncio), False])
@testing.variation("has_terminate", [True, False])
- def test_checkin_event_gc(self, is_asyncio, has_terminate):
+ @testing.variation("invalidate_conn_rec", [True, False])
+ def test_checkin_event_gc(
+ self, is_asyncio, has_terminate, invalidate_conn_rec
+ ):
"""tests for #8419, which have been modified for 2.0 in #9237"""
p, canary = self._checkin_event_fixture(
eq_(canary, [])
- if is_asyncio:
+ if invalidate_conn_rec:
+ # test #10414
+ c1._connection_record.invalidate()
+
+ if is_asyncio and not invalidate_conn_rec:
if has_terminate:
with expect_warnings(
"The garbage collector is trying to clean up.*which will "
detach_gced = is_asyncio
- if detach_gced:
+ if invalidate_conn_rec:
+ eq_(canary, ["checkin"])
+ elif detach_gced:
if has_terminate:
eq_(canary, ["reset_no_rollback", "detach", "close_detached"])
else:
eq_(canary, ["reset_rollback_ok", "checkin"])
gc_collect()
- if detach_gced:
+ if detach_gced or invalidate_conn_rec:
is_none(dbapi_connection())
else:
is_not_none(dbapi_connection())
"detach_gced",
[("detached_gc", testing.requires.asyncio), "normal_gc"],
)
+ @testing.variation("invalidate_conn_rec", [True, False])
@testing.emits_warning("The garbage collector")
- def test_userspace_disconnectionerror_weakref_finalizer(self, detach_gced):
+ def test_userspace_disconnectionerror_weakref_finalizer(
+ self, detach_gced, invalidate_conn_rec
+ ):
dbapi, pool = self._queuepool_dbapi_fixture(
pool_size=1, max_overflow=2, _is_asyncio=detach_gced
)
)
not_closed_dbapi_conn = conn.dbapi_connection
+
+ if invalidate_conn_rec:
+ conn._connection_record.invalidate()
+
del conn
gc_collect()
# this creates a gc-level warning that is not easy to pin down,
# hence we use the testing.emits_warning() decorator just to squash
# it
- eq_(not_closed_dbapi_conn.mock_calls, [call.rollback()])
+
+ if invalidate_conn_rec:
+ eq_(not_closed_dbapi_conn.mock_calls, [call.close()])
+ else:
+ eq_(not_closed_dbapi_conn.mock_calls, [call.rollback()])
@testing.requires.timing_intensive
def test_recycle_pool_no_race(self):