From: Daniele Varrazzo Date: Fri, 6 Oct 2023 11:00:41 +0000 (+0200) Subject: refactor(pool): introduce a loop in getconn to allow connection check X-Git-Tag: pool-3.2.0~7^2~5 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=d1fa0bc2bdce374072aa1dad9da8a56ed880b276;p=thirdparty%2Fpsycopg.git refactor(pool): introduce a loop in getconn to allow connection check --- diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py index 185cef08d..7c1cf6339 100644 --- a/psycopg_pool/psycopg_pool/pool.py +++ b/psycopg_pool/psycopg_pool/pool.py @@ -232,11 +232,34 @@ class ConnectionPool(Generic[CT], BasePool): failing to do so will deplete the pool. A depleted pool is a sad pool: you don't want a depleted pool. """ + t0 = monotonic() + if timeout is None: + timeout = self.timeout + deadline = t0 + timeout + logger.info("connection requested from %r", self.name) self._stats[self._REQUESTS_NUM] += 1 self._check_open_getconn() + try: + while True: + conn = self._getconn_unchecked(deadline - monotonic()) + try: + self._check_connection(conn) + except Exception: + self.putconn(conn) + else: + logger.info("connection given by %r", self.name) + return conn + # Re-raise the timeout exception presenting the user the global + # timeout, not the per-attempt one. + except PoolTimeout: + raise PoolTimeout( + f"couldn't get a connection after {timeout:.2f} sec" + ) from None + + def _getconn_unchecked(self, timeout: float) -> CT: # Critical section: decide here if there's a connection ready # or if the client needs to wait. with self._lock: @@ -254,8 +277,6 @@ class ConnectionPool(Generic[CT], BasePool): # If we are in the waiting queue, wait to be assigned a connection # (outside the critical section, so only the waiting client is locked) if not conn: - if timeout is None: - timeout = self.timeout try: conn = pos.wait(timeout=timeout) except Exception: @@ -269,7 +290,6 @@ class ConnectionPool(Generic[CT], BasePool): # Note that this property shouldn't be set while the connection is in # the pool, to avoid to create a reference loop. conn._pool = self - logger.info("connection given by %r", self.name) return conn def _get_ready_connection(self, timeout: Optional[float]) -> Optional[CT]: @@ -288,6 +308,9 @@ class ConnectionPool(Generic[CT], BasePool): ) return conn + def _check_connection(self, conn: CT) -> None: + pass + def _maybe_grow_pool(self) -> None: # Allow only one task at time to grow the pool (or returning # connections might be starved). diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index c1c371022..a75418392 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -252,11 +252,35 @@ class AsyncConnectionPool(Generic[ACT], BasePool): failing to do so will deplete the pool. A depleted pool is a sad pool: you don't want a depleted pool. """ + t0 = monotonic() + if timeout is None: + timeout = self.timeout + deadline = t0 + timeout + logger.info("connection requested from %r", self.name) self._stats[self._REQUESTS_NUM] += 1 self._check_open_getconn() + try: + while True: + conn = await self._getconn_unchecked(deadline - monotonic()) + try: + await self._check_connection(conn) + except Exception: + await self.putconn(conn) + else: + logger.info("connection given by %r", self.name) + return conn + + # Re-raise the timeout exception presenting the user the global + # timeout, not the per-attempt one. + except PoolTimeout: + raise PoolTimeout( + f"couldn't get a connection after {timeout:.2f} sec" + ) from None + + async def _getconn_unchecked(self, timeout: float) -> ACT: # Critical section: decide here if there's a connection ready # or if the client needs to wait. async with self._lock: @@ -274,8 +298,6 @@ class AsyncConnectionPool(Generic[ACT], BasePool): # If we are in the waiting queue, wait to be assigned a connection # (outside the critical section, so only the waiting client is locked) if not conn: - if timeout is None: - timeout = self.timeout try: conn = await pos.wait(timeout=timeout) except Exception: @@ -289,7 +311,6 @@ class AsyncConnectionPool(Generic[ACT], BasePool): # Note that this property shouldn't be set while the connection is in # the pool, to avoid to create a reference loop. conn._pool = self - logger.info("connection given by %r", self.name) return conn async def _get_ready_connection(self, timeout: Optional[float]) -> Optional[ACT]: @@ -308,6 +329,9 @@ class AsyncConnectionPool(Generic[ACT], BasePool): ) return conn + async def _check_connection(self, conn: ACT) -> None: + pass + def _maybe_grow_pool(self) -> None: # Allow only one task at time to grow the pool (or returning # connections might be starved).