]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
refactor(pool): add _close_connection() method
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 19 Nov 2025 23:13:11 +0000 (00:13 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 21 Nov 2025 15:59:49 +0000 (16:59 +0100)
Improves on the solution for #1124

psycopg_pool/psycopg_pool/null_pool.py
psycopg_pool/psycopg_pool/null_pool_async.py
psycopg_pool/psycopg_pool/pool.py
psycopg_pool/psycopg_pool/pool_async.py

index 59a34d2421c0849e4d90894092cc1f9f4f33fa18..9ae119bc50285870a9925586b20b7d419244dac6 100644 (file)
@@ -131,7 +131,7 @@ class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool[CT]):
             conn._pool = None
             if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
                 self._stats[self._RETURNS_BAD] += 1
-            conn.close()
+            self._close_connection(conn)
             self._nconns -= 1
             return True
 
@@ -169,7 +169,7 @@ class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool[CT]):
                     break
             else:
                 # No client waiting for a connection: close the connection
-                conn.close()
+                self._close_connection(conn)
                 # If we have been asked to wait for pool init, notify the
                 # waiter if the pool is ready.
                 if self._pool_full_event:
index 8329c202d99cdc0ce4f632fee7b8a1aee191761c..c9b567c063c88feada185b630979d35f42d58cf4 100644 (file)
@@ -128,7 +128,7 @@ class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool[ACT])
             conn._pool = None
             if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
                 self._stats[self._RETURNS_BAD] += 1
-            await conn.close()
+            await self._close_connection(conn)
             self._nconns -= 1
             return True
 
@@ -166,7 +166,7 @@ class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool[ACT])
                     break
             else:
                 # No client waiting for a connection: close the connection
-                await conn.close()
+                await self._close_connection(conn)
                 # If we have been asked to wait for pool init, notify the
                 # waiter if the pool is ready.
                 if self._pool_full_event:
index 619dad34ebf59616a0db831c7f08f4de2376f9aa..e4a58c8d9c87516b3449dc7b5232012bd9b31ce3 100644 (file)
@@ -340,8 +340,7 @@ class ConnectionPool(Generic[CT], BasePool):
         if not self._closed:
             return False
 
-        conn._pool = None
-        conn.close()
+        self._close_connection(conn)
         return True
 
     def open(self, wait: bool = False, timeout: float = 30.0) -> None:
@@ -448,8 +447,7 @@ class ConnectionPool(Generic[CT], BasePool):
 
         # Close the connections that were still in the pool
         for conn in connections:
-            conn._pool = None
-            conn.close()
+            self._close_connection(conn)
 
         # Signal to eventual clients in the queue that business is closed.
         for pos in waiting:
@@ -521,8 +519,7 @@ class ConnectionPool(Generic[CT], BasePool):
             # Check for expired connections
             if conn._expire_at <= monotonic():
                 logger.info("discarding expired connection %s", conn)
-                conn._pool = None
-                conn.close()
+                self._close_connection(conn)
                 self.run_task(AddConnection(self))
                 continue
 
@@ -661,7 +658,7 @@ class ConnectionPool(Generic[CT], BasePool):
         try:
             conn = self._connect()
         except Exception as ex:
-            logger.warning(f"error connecting in {self.name!r}: {ex}")
+            logger.warning("error connecting in %r: %s", self.name, ex)
             if attempt.time_to_give_up(now):
                 logger.warning(
                     "reconnection attempt in pool %r failed after %s sec",
@@ -718,8 +715,7 @@ class ConnectionPool(Generic[CT], BasePool):
         # Check if the connection is past its best before date
         if conn._expire_at <= monotonic():
             logger.info("discarding expired connection")
-            conn._pool = None
-            conn.close()
+            self._close_connection(conn)
             self.run_task(AddConnection(self))
             return
 
@@ -744,7 +740,7 @@ class ConnectionPool(Generic[CT], BasePool):
         # between here and entering the lock. Therefore we will make another
         # check later.
         if self._closed:
-            conn.close()
+            self._close_connection(conn)
             return
 
         # Critical section: if there is a client waiting give it the connection
@@ -755,7 +751,7 @@ class ConnectionPool(Generic[CT], BasePool):
             # this connection while the main process is closing the pool.
             # Now that we are in the critical section we know for real.
             if self._closed:
-                conn.close()
+                self._close_connection(conn)
                 return
 
             while self._waiting:
@@ -793,13 +789,11 @@ class ConnectionPool(Generic[CT], BasePool):
                     ex,
                     conn,
                 )
-                conn._pool = None
-                conn.close()
+                self._close_connection(conn)
         elif status == TransactionStatus.ACTIVE:
             # Connection returned during an operation. Bad... just close it.
             logger.warning("closing returned connection: %s", conn)
-            conn._pool = None
-            conn.close()
+            self._close_connection(conn)
 
         if self._reset:
             try:
@@ -810,9 +804,12 @@ class ConnectionPool(Generic[CT], BasePool):
                         f"connection left in status {sname} by reset function {self._reset}: discarded"
                     )
             except Exception as ex:
-                logger.warning(f"error resetting connection: {ex}")
-                conn._pool = None
-                conn.close()
+                logger.warning("error resetting connection: %s", ex)
+                self._close_connection(conn)
+
+    def _close_connection(self, conn: CT) -> None:
+        conn._pool = None
+        conn.close()
 
     def _shrink_pool(self) -> None:
         to_close: CT | None = None
@@ -836,8 +833,7 @@ class ConnectionPool(Generic[CT], BasePool):
                 nconns_min,
                 self.max_idle,
             )
-            to_close._pool = None
-            to_close.close()
+            self._close_connection(to_close)
 
     def _get_measures(self) -> dict[str, int]:
         rv = super()._get_measures()
index 58a943034f6bd607d377df215a586b730d3cbe75..b3242af7eca467592129411e8ca9bba278d61f99 100644 (file)
@@ -373,8 +373,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         if not self._closed:
             return False
 
-        conn._pool = None
-        await conn.close()
+        await self._close_connection(conn)
         return True
 
     async def open(self, wait: bool = False, timeout: float = 30.0) -> None:
@@ -488,8 +487,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
 
         # Close the connections that were still in the pool
         for conn in connections:
-            conn._pool = None
-            await conn.close()
+            await self._close_connection(conn)
 
         # Signal to eventual clients in the queue that business is closed.
         for pos in waiting:
@@ -561,8 +559,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             # Check for expired connections
             if conn._expire_at <= monotonic():
                 logger.info("discarding expired connection %s", conn)
-                conn._pool = None
-                await conn.close()
+                await self._close_connection(conn)
                 self.run_task(AddConnection(self))
                 continue
 
@@ -712,7 +709,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         try:
             conn = await self._connect()
         except Exception as ex:
-            logger.warning(f"error connecting in {self.name!r}: {ex}")
+            logger.warning("error connecting in %r: %s", self.name, ex)
             if attempt.time_to_give_up(now):
                 logger.warning(
                     "reconnection attempt in pool %r failed after %s sec",
@@ -771,8 +768,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         # Check if the connection is past its best before date
         if conn._expire_at <= monotonic():
             logger.info("discarding expired connection")
-            conn._pool = None
-            await conn.close()
+            await self._close_connection(conn)
             self.run_task(AddConnection(self))
             return
 
@@ -797,7 +793,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         # between here and entering the lock. Therefore we will make another
         # check later.
         if self._closed:
-            await conn.close()
+            await self._close_connection(conn)
             return
 
         # Critical section: if there is a client waiting give it the connection
@@ -809,7 +805,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             # this connection while the main process is closing the pool.
             # Now that we are in the critical section we know for real.
             if self._closed:
-                await conn.close()
+                await self._close_connection(conn)
                 return
 
             while self._waiting:
@@ -848,14 +844,12 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
                     ex,
                     conn,
                 )
-                conn._pool = None
-                await conn.close()
+                await self._close_connection(conn)
 
         elif status == TransactionStatus.ACTIVE:
             # Connection returned during an operation. Bad... just close it.
             logger.warning("closing returned connection: %s", conn)
-            conn._pool = None
-            await conn.close()
+            await self._close_connection(conn)
 
         if self._reset:
             try:
@@ -867,9 +861,12 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
                         f" {self._reset}: discarded"
                     )
             except Exception as ex:
-                logger.warning(f"error resetting connection: {ex}")
-                conn._pool = None
-                await conn.close()
+                logger.warning("error resetting connection: %s", ex)
+                await self._close_connection(conn)
+
+    async def _close_connection(self, conn: ACT) -> None:
+        conn._pool = None
+        await conn.close()
 
     async def _shrink_pool(self) -> None:
         to_close: ACT | None = None
@@ -894,8 +891,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
                 nconns_min,
                 self.max_idle,
             )
-            to_close._pool = None
-            await to_close.close()
+            await self._close_connection(to_close)
 
     def _get_measures(self) -> dict[str, int]:
         rv = super()._get_measures()