]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
refactor(pool): introduce a loop in getconn to allow connection check
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 6 Oct 2023 11:00:41 +0000 (13:00 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 14 Oct 2023 07:40:20 +0000 (09:40 +0200)
psycopg_pool/psycopg_pool/pool.py
psycopg_pool/psycopg_pool/pool_async.py

index 185cef08d881aa94a0f8401194d411c9d97eff5d..7c1cf63398825f663ae2078ca9fb1c2dfc7962e6 100644 (file)
@@ -232,11 +232,34 @@ class ConnectionPool(Generic[CT], BasePool):
         failing to do so will deplete the pool. A depleted pool is a sad pool:
         you don't want a depleted pool.
         """
+        t0 = monotonic()
+        if timeout is None:
+            timeout = self.timeout
+        deadline = t0 + timeout
+
         logger.info("connection requested from %r", self.name)
         self._stats[self._REQUESTS_NUM] += 1
 
         self._check_open_getconn()
 
+        try:
+            while True:
+                conn = self._getconn_unchecked(deadline - monotonic())
+                try:
+                    self._check_connection(conn)
+                except Exception:
+                    self.putconn(conn)
+                else:
+                    logger.info("connection given by %r", self.name)
+                    return conn
+        # Re-raise the timeout exception presenting the user the global
+        # timeout, not the per-attempt one.
+        except PoolTimeout:
+            raise PoolTimeout(
+                f"couldn't get a connection after {timeout:.2f} sec"
+            ) from None
+
+    def _getconn_unchecked(self, timeout: float) -> CT:
         # Critical section: decide here if there's a connection ready
         # or if the client needs to wait.
         with self._lock:
@@ -254,8 +277,6 @@ class ConnectionPool(Generic[CT], BasePool):
         # If we are in the waiting queue, wait to be assigned a connection
         # (outside the critical section, so only the waiting client is locked)
         if not conn:
-            if timeout is None:
-                timeout = self.timeout
             try:
                 conn = pos.wait(timeout=timeout)
             except Exception:
@@ -269,7 +290,6 @@ class ConnectionPool(Generic[CT], BasePool):
         # Note that this property shouldn't be set while the connection is in
         # the pool, to avoid to create a reference loop.
         conn._pool = self
-        logger.info("connection given by %r", self.name)
         return conn
 
     def _get_ready_connection(self, timeout: Optional[float]) -> Optional[CT]:
@@ -288,6 +308,9 @@ class ConnectionPool(Generic[CT], BasePool):
             )
         return conn
 
+    def _check_connection(self, conn: CT) -> None:
+        pass
+
     def _maybe_grow_pool(self) -> None:
         # Allow only one task at time to grow the pool (or returning
         # connections might be starved).
index c1c371022d1a179f8db6f5575414e4fe593e2eaf..a7541839236d526f13f79ff9d64132b595fe99c9 100644 (file)
@@ -252,11 +252,35 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         failing to do so will deplete the pool. A depleted pool is a sad pool:
         you don't want a depleted pool.
         """
+        t0 = monotonic()
+        if timeout is None:
+            timeout = self.timeout
+        deadline = t0 + timeout
+
         logger.info("connection requested from %r", self.name)
         self._stats[self._REQUESTS_NUM] += 1
 
         self._check_open_getconn()
 
+        try:
+            while True:
+                conn = await self._getconn_unchecked(deadline - monotonic())
+                try:
+                    await self._check_connection(conn)
+                except Exception:
+                    await self.putconn(conn)
+                else:
+                    logger.info("connection given by %r", self.name)
+                    return conn
+
+        # Re-raise the timeout exception presenting the user the global
+        # timeout, not the per-attempt one.
+        except PoolTimeout:
+            raise PoolTimeout(
+                f"couldn't get a connection after {timeout:.2f} sec"
+            ) from None
+
+    async def _getconn_unchecked(self, timeout: float) -> ACT:
         # Critical section: decide here if there's a connection ready
         # or if the client needs to wait.
         async with self._lock:
@@ -274,8 +298,6 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         # If we are in the waiting queue, wait to be assigned a connection
         # (outside the critical section, so only the waiting client is locked)
         if not conn:
-            if timeout is None:
-                timeout = self.timeout
             try:
                 conn = await pos.wait(timeout=timeout)
             except Exception:
@@ -289,7 +311,6 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         # Note that this property shouldn't be set while the connection is in
         # the pool, to avoid to create a reference loop.
         conn._pool = self
-        logger.info("connection given by %r", self.name)
         return conn
 
     async def _get_ready_connection(self, timeout: Optional[float]) -> Optional[ACT]:
@@ -308,6 +329,9 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             )
         return conn
 
+    async def _check_connection(self, conn: ACT) -> None:
+        pass
+
     def _maybe_grow_pool(self) -> None:
         # Allow only one task at time to grow the pool (or returning
         # connections might be starved).