From: Daniele Varrazzo Date: Wed, 19 Nov 2025 23:13:11 +0000 (+0100) Subject: refactor(pool): add _close_connection() method X-Git-Tag: 3.3.0~18 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3dbd06329a5b1c84ece93fb84ccf459f02f2e704;p=thirdparty%2Fpsycopg.git refactor(pool): add _close_connection() method Improves on the solution for #1124 --- diff --git a/psycopg_pool/psycopg_pool/null_pool.py b/psycopg_pool/psycopg_pool/null_pool.py index 59a34d242..9ae119bc5 100644 --- a/psycopg_pool/psycopg_pool/null_pool.py +++ b/psycopg_pool/psycopg_pool/null_pool.py @@ -131,7 +131,7 @@ class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool[CT]): conn._pool = None if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: self._stats[self._RETURNS_BAD] += 1 - conn.close() + self._close_connection(conn) self._nconns -= 1 return True @@ -169,7 +169,7 @@ class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool[CT]): break else: # No client waiting for a connection: close the connection - conn.close() + self._close_connection(conn) # If we have been asked to wait for pool init, notify the # waiter if the pool is ready. if self._pool_full_event: diff --git a/psycopg_pool/psycopg_pool/null_pool_async.py b/psycopg_pool/psycopg_pool/null_pool_async.py index 8329c202d..c9b567c06 100644 --- a/psycopg_pool/psycopg_pool/null_pool_async.py +++ b/psycopg_pool/psycopg_pool/null_pool_async.py @@ -128,7 +128,7 @@ class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool[ACT]) conn._pool = None if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: self._stats[self._RETURNS_BAD] += 1 - await conn.close() + await self._close_connection(conn) self._nconns -= 1 return True @@ -166,7 +166,7 @@ class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool[ACT]) break else: # No client waiting for a connection: close the connection - await conn.close() + await self._close_connection(conn) # If we have been asked to wait for pool init, notify the # waiter if the pool is ready. if self._pool_full_event: diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py index 619dad34e..e4a58c8d9 100644 --- a/psycopg_pool/psycopg_pool/pool.py +++ b/psycopg_pool/psycopg_pool/pool.py @@ -340,8 +340,7 @@ class ConnectionPool(Generic[CT], BasePool): if not self._closed: return False - conn._pool = None - conn.close() + self._close_connection(conn) return True def open(self, wait: bool = False, timeout: float = 30.0) -> None: @@ -448,8 +447,7 @@ class ConnectionPool(Generic[CT], BasePool): # Close the connections that were still in the pool for conn in connections: - conn._pool = None - conn.close() + self._close_connection(conn) # Signal to eventual clients in the queue that business is closed. for pos in waiting: @@ -521,8 +519,7 @@ class ConnectionPool(Generic[CT], BasePool): # Check for expired connections if conn._expire_at <= monotonic(): logger.info("discarding expired connection %s", conn) - conn._pool = None - conn.close() + self._close_connection(conn) self.run_task(AddConnection(self)) continue @@ -661,7 +658,7 @@ class ConnectionPool(Generic[CT], BasePool): try: conn = self._connect() except Exception as ex: - logger.warning(f"error connecting in {self.name!r}: {ex}") + logger.warning("error connecting in %r: %s", self.name, ex) if attempt.time_to_give_up(now): logger.warning( "reconnection attempt in pool %r failed after %s sec", @@ -718,8 +715,7 @@ class ConnectionPool(Generic[CT], BasePool): # Check if the connection is past its best before date if conn._expire_at <= monotonic(): logger.info("discarding expired connection") - conn._pool = None - conn.close() + self._close_connection(conn) self.run_task(AddConnection(self)) return @@ -744,7 +740,7 @@ class ConnectionPool(Generic[CT], BasePool): # between here and entering the lock. Therefore we will make another # check later. if self._closed: - conn.close() + self._close_connection(conn) return # Critical section: if there is a client waiting give it the connection @@ -755,7 +751,7 @@ class ConnectionPool(Generic[CT], BasePool): # this connection while the main process is closing the pool. # Now that we are in the critical section we know for real. if self._closed: - conn.close() + self._close_connection(conn) return while self._waiting: @@ -793,13 +789,11 @@ class ConnectionPool(Generic[CT], BasePool): ex, conn, ) - conn._pool = None - conn.close() + self._close_connection(conn) elif status == TransactionStatus.ACTIVE: # Connection returned during an operation. Bad... just close it. logger.warning("closing returned connection: %s", conn) - conn._pool = None - conn.close() + self._close_connection(conn) if self._reset: try: @@ -810,9 +804,12 @@ class ConnectionPool(Generic[CT], BasePool): f"connection left in status {sname} by reset function {self._reset}: discarded" ) except Exception as ex: - logger.warning(f"error resetting connection: {ex}") - conn._pool = None - conn.close() + logger.warning("error resetting connection: %s", ex) + self._close_connection(conn) + + def _close_connection(self, conn: CT) -> None: + conn._pool = None + conn.close() def _shrink_pool(self) -> None: to_close: CT | None = None @@ -836,8 +833,7 @@ class ConnectionPool(Generic[CT], BasePool): nconns_min, self.max_idle, ) - to_close._pool = None - to_close.close() + self._close_connection(to_close) def _get_measures(self) -> dict[str, int]: rv = super()._get_measures() diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index 58a943034..b3242af7e 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -373,8 +373,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): if not self._closed: return False - conn._pool = None - await conn.close() + await self._close_connection(conn) return True async def open(self, wait: bool = False, timeout: float = 30.0) -> None: @@ -488,8 +487,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): # Close the connections that were still in the pool for conn in connections: - conn._pool = None - await conn.close() + await self._close_connection(conn) # Signal to eventual clients in the queue that business is closed. for pos in waiting: @@ -561,8 +559,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): # Check for expired connections if conn._expire_at <= monotonic(): logger.info("discarding expired connection %s", conn) - conn._pool = None - await conn.close() + await self._close_connection(conn) self.run_task(AddConnection(self)) continue @@ -712,7 +709,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): try: conn = await self._connect() except Exception as ex: - logger.warning(f"error connecting in {self.name!r}: {ex}") + logger.warning("error connecting in %r: %s", self.name, ex) if attempt.time_to_give_up(now): logger.warning( "reconnection attempt in pool %r failed after %s sec", @@ -771,8 +768,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): # Check if the connection is past its best before date if conn._expire_at <= monotonic(): logger.info("discarding expired connection") - conn._pool = None - await conn.close() + await self._close_connection(conn) self.run_task(AddConnection(self)) return @@ -797,7 +793,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): # between here and entering the lock. Therefore we will make another # check later. if self._closed: - await conn.close() + await self._close_connection(conn) return # Critical section: if there is a client waiting give it the connection @@ -809,7 +805,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): # this connection while the main process is closing the pool. # Now that we are in the critical section we know for real. if self._closed: - await conn.close() + await self._close_connection(conn) return while self._waiting: @@ -848,14 +844,12 @@ class AsyncConnectionPool(Generic[ACT], BasePool): ex, conn, ) - conn._pool = None - await conn.close() + await self._close_connection(conn) elif status == TransactionStatus.ACTIVE: # Connection returned during an operation. Bad... just close it. logger.warning("closing returned connection: %s", conn) - conn._pool = None - await conn.close() + await self._close_connection(conn) if self._reset: try: @@ -867,9 +861,12 @@ class AsyncConnectionPool(Generic[ACT], BasePool): f" {self._reset}: discarded" ) except Exception as ex: - logger.warning(f"error resetting connection: {ex}") - conn._pool = None - await conn.close() + logger.warning("error resetting connection: %s", ex) + await self._close_connection(conn) + + async def _close_connection(self, conn: ACT) -> None: + conn._pool = None + await conn.close() async def _shrink_pool(self) -> None: to_close: ACT | None = None @@ -894,8 +891,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): nconns_min, self.max_idle, ) - to_close._pool = None - await to_close.close() + await self._close_connection(to_close) def _get_measures(self) -> dict[str, int]: rv = super()._get_measures()