From: Daniele Varrazzo Date: Fri, 29 Dec 2023 23:41:07 +0000 (+0100) Subject: fix(pool): use an exponential backoff to separate failing checks X-Git-Tag: pool-3.2.1~6^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6a6f0acff0a7926beb7f29f960249cd5fe3ae24f;p=thirdparty%2Fpsycopg.git fix(pool): use an exponential backoff to separate failing checks This is a consistent behaviour with with the exponential backoff used in reconnection attempts, and actually we could reuse the same object implementing the backoff + jitter. See also #709 --- diff --git a/docs/news_pool.rst b/docs/news_pool.rst index ea969810e..84c202077 100644 --- a/docs/news_pool.rst +++ b/docs/news_pool.rst @@ -13,8 +13,9 @@ Future releases psycopg_pool 3.2.1 (unreleased) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -- Respect timeout on `~ConnectionPool.connection()` when `!check` fails - (:ticket:`#709`). +- Respect the `!timeout` parameter on `~ConnectionPool.connection()` when + `!check` fails. Also avoid a busy-loop of checking; separate check attempts + using an exponential backoff (:ticket:`#709`). - Use `typing.Self` as a more correct return value annotation of context managers and other self-returning methods (see :ticket:`708`). diff --git a/psycopg_pool/psycopg_pool/_acompat.py b/psycopg_pool/psycopg_pool/_acompat.py index d6417590b..4e4fa20b0 100644 --- a/psycopg_pool/psycopg_pool/_acompat.py +++ b/psycopg_pool/psycopg_pool/_acompat.py @@ -10,6 +10,7 @@ when generating the sync version. from __future__ import annotations +import time import queue import asyncio import logging @@ -28,6 +29,7 @@ Event = threading.Event Condition = threading.Condition Lock = threading.RLock ALock = asyncio.Lock +sleep = time.sleep Worker: TypeAlias = threading.Thread AWorker: TypeAlias = "asyncio.Task[None]" @@ -162,3 +164,10 @@ def gather(*tasks: threading.Thread, timeout: float | None = None) -> None: if not t.is_alive(): continue logger.warning("couldn't stop thread %r within %s seconds", t.name, timeout) + + +def asleep(seconds: float) -> Coroutine[Any, Any, None]: + """ + Equivalent to asyncio.sleep(), converted to time.sleep() by async_to_sync. + """ + return asyncio.sleep(seconds) diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py index 0b2d7ea33..90680d16f 100644 --- a/psycopg_pool/psycopg_pool/pool.py +++ b/psycopg_pool/psycopg_pool/pool.py @@ -28,7 +28,7 @@ from .base import ConnectionAttempt, BasePool from .errors import PoolClosed, PoolTimeout, TooManyRequests from ._compat import Deque, Self from ._acompat import Condition, Event, Lock, Queue, Worker, spawn, gather -from ._acompat import current_thread_name +from ._acompat import sleep, current_thread_name from .sched import Scheduler @@ -197,15 +197,7 @@ class ConnectionPool(Generic[CT], BasePool): self._check_open_getconn() try: - while True: - conn = self._getconn_unchecked(deadline - monotonic()) - try: - self._check_connection(conn) - except Exception: - self._putconn(conn, from_getconn=True) - else: - logger.info("connection given by %r", self.name) - return conn + return self._getconn_with_check_loop(deadline) # Re-raise the timeout exception presenting the user the global # timeout, not the per-attempt one. except PoolTimeout: @@ -213,6 +205,32 @@ class ConnectionPool(Generic[CT], BasePool): f"couldn't get a connection after {timeout:.2f} sec" ) from None + def _getconn_with_check_loop(self, deadline: float) -> CT: + attempt: ConnectionAttempt | None = None + + while True: + conn = self._getconn_unchecked(deadline - monotonic()) + try: + self._check_connection(conn) + except Exception: + self._putconn(conn, from_getconn=True) + else: + logger.info("connection given by %r", self.name) + return conn + + # Delay further checks to avoid a busy loop, using the same + # backoff policy used in reconnection attempts. + now = monotonic() + if not attempt: + attempt = ConnectionAttempt(reconnect_timeout=deadline - now) + else: + attempt.update_delay(now) + + if attempt.time_to_give_up(now): + raise PoolTimeout() + else: + sleep(attempt.delay) + def _getconn_unchecked(self, timeout: float) -> CT: # Critical section: decide here if there's a connection ready # or if the client needs to wait. diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index 0a7e7e302..add6fa84e 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -25,7 +25,7 @@ from .base import ConnectionAttempt, BasePool from .errors import PoolClosed, PoolTimeout, TooManyRequests from ._compat import Deque, Self from ._acompat import ACondition, AEvent, ALock, AQueue, AWorker, aspawn, agather -from ._acompat import current_task_name +from ._acompat import asleep, current_task_name from .sched_async import AsyncScheduler if True: # ASYNC @@ -217,15 +217,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): self._check_open_getconn() try: - while True: - conn = await self._getconn_unchecked(deadline - monotonic()) - try: - await self._check_connection(conn) - except Exception: - await self._putconn(conn, from_getconn=True) - else: - logger.info("connection given by %r", self.name) - return conn + return await self._getconn_with_check_loop(deadline) # Re-raise the timeout exception presenting the user the global # timeout, not the per-attempt one. @@ -234,6 +226,32 @@ class AsyncConnectionPool(Generic[ACT], BasePool): f"couldn't get a connection after {timeout:.2f} sec" ) from None + async def _getconn_with_check_loop(self, deadline: float) -> ACT: + attempt: ConnectionAttempt | None = None + + while True: + conn = await self._getconn_unchecked(deadline - monotonic()) + try: + await self._check_connection(conn) + except Exception: + await self._putconn(conn, from_getconn=True) + else: + logger.info("connection given by %r", self.name) + return conn + + # Delay further checks to avoid a busy loop, using the same + # backoff policy used in reconnection attempts. + now = monotonic() + if not attempt: + attempt = ConnectionAttempt(reconnect_timeout=deadline - now) + else: + attempt.update_delay(now) + + if attempt.time_to_give_up(now): + raise PoolTimeout() + else: + await asleep(attempt.delay) + async def _getconn_unchecked(self, timeout: float) -> ACT: # Critical section: decide here if there's a connection ready # or if the client needs to wait. diff --git a/tests/pool/test_pool.py b/tests/pool/test_pool.py index 473fe17f5..910595785 100644 --- a/tests/pool/test_pool.py +++ b/tests/pool/test_pool.py @@ -979,3 +979,34 @@ def test_cancellation_in_queue(dsn): with p.connection() as conn: cur = conn.execute("select 1") assert cur.fetchone() == (1,) + + +@pytest.mark.slow +@pytest.mark.timing +def test_check_backoff(dsn, caplog, monkeypatch): + caplog.set_level(logging.INFO, logger="psycopg.pool") + + assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0 + assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1 + monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1) + monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0) + + def check(conn): + raise Exception() + + caplog.clear() + with pool.ConnectionPool(dsn, min_size=1, check=check) as p: + p.wait(2.0) + + with pytest.raises(pool.PoolTimeout): + with p.connection(timeout=1.0): + assert False + + times = [rec.created for rec in caplog.records if "failed check" in rec.message] + assert times[1] - times[0] < 0.05 + deltas = [times[i + 1] - times[i] for i in range(1, len(times) - 1)] + assert len(deltas) == 3 + want = 0.1 + for delta in deltas: + assert delta == pytest.approx(want, 0.05), deltas + want *= 2 diff --git a/tests/pool/test_pool_async.py b/tests/pool/test_pool_async.py index b192bf862..ccb9c4aa3 100644 --- a/tests/pool/test_pool_async.py +++ b/tests/pool/test_pool_async.py @@ -986,3 +986,34 @@ async def test_cancellation_in_queue(dsn): async with p.connection() as conn: cur = await conn.execute("select 1") assert await cur.fetchone() == (1,) + + +@pytest.mark.slow +@pytest.mark.timing +async def test_check_backoff(dsn, caplog, monkeypatch): + caplog.set_level(logging.INFO, logger="psycopg.pool") + + assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0 + assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1 + monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1) + monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0) + + async def check(conn): + raise Exception() + + caplog.clear() + async with pool.AsyncConnectionPool(dsn, min_size=1, check=check) as p: + await p.wait(2.0) + + with pytest.raises(pool.PoolTimeout): + async with p.connection(timeout=1.0): + assert False + + times = [rec.created for rec in caplog.records if "failed check" in rec.message] + assert times[1] - times[0] < 0.05 + deltas = [times[i + 1] - times[i] for i in range(1, len(times) - 1)] + assert len(deltas) == 3 + want = 0.1 + for delta in deltas: + assert delta == pytest.approx(want, 0.05), deltas + want *= 2