]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix(pool): use an exponential backoff to separate failing checks
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 29 Dec 2023 23:41:07 +0000 (00:41 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 6 Jan 2024 12:04:47 +0000 (13:04 +0100)
This is a consistent behaviour with with the exponential backoff used in
reconnection attempts, and actually we could reuse the same object
implementing the backoff + jitter.

See also #709

docs/news_pool.rst
psycopg_pool/psycopg_pool/_acompat.py
psycopg_pool/psycopg_pool/pool.py
psycopg_pool/psycopg_pool/pool_async.py
tests/pool/test_pool.py
tests/pool/test_pool_async.py

index ea969810e26ac4659b620e62832648cc76122730..84c2020779457882203264c7d05860fe1ffeaa97 100644 (file)
@@ -13,8 +13,9 @@ Future releases
 psycopg_pool 3.2.1 (unreleased)
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-- Respect timeout on `~ConnectionPool.connection()` when `!check` fails
-  (:ticket:`#709`).
+- Respect the `!timeout` parameter on `~ConnectionPool.connection()` when
+  `!check` fails. Also avoid a busy-loop of checking; separate check attempts
+  using an exponential backoff (:ticket:`#709`).
 - Use `typing.Self` as a more correct return value annotation of context
   managers and other self-returning methods (see :ticket:`708`).
 
index d6417590ba67e24402601ee7a8b3b97c1d1839b1..4e4fa20b0a42320e7a18e3441a1741f54d7d7f7f 100644 (file)
@@ -10,6 +10,7 @@ when generating the sync version.
 
 from __future__ import annotations
 
+import time
 import queue
 import asyncio
 import logging
@@ -28,6 +29,7 @@ Event = threading.Event
 Condition = threading.Condition
 Lock = threading.RLock
 ALock = asyncio.Lock
+sleep = time.sleep
 
 Worker: TypeAlias = threading.Thread
 AWorker: TypeAlias = "asyncio.Task[None]"
@@ -162,3 +164,10 @@ def gather(*tasks: threading.Thread, timeout: float | None = None) -> None:
         if not t.is_alive():
             continue
         logger.warning("couldn't stop thread %r within %s seconds", t.name, timeout)
+
+
+def asleep(seconds: float) -> Coroutine[Any, Any, None]:
+    """
+    Equivalent to asyncio.sleep(), converted to time.sleep() by async_to_sync.
+    """
+    return asyncio.sleep(seconds)
index 0b2d7ea3371943e6a9bfc0bedba8335b25e44002..90680d16f9ffeb377e0f342edadc40223c40931d 100644 (file)
@@ -28,7 +28,7 @@ from .base import ConnectionAttempt, BasePool
 from .errors import PoolClosed, PoolTimeout, TooManyRequests
 from ._compat import Deque, Self
 from ._acompat import Condition, Event, Lock, Queue, Worker, spawn, gather
-from ._acompat import current_thread_name
+from ._acompat import sleep, current_thread_name
 from .sched import Scheduler
 
 
@@ -197,15 +197,7 @@ class ConnectionPool(Generic[CT], BasePool):
         self._check_open_getconn()
 
         try:
-            while True:
-                conn = self._getconn_unchecked(deadline - monotonic())
-                try:
-                    self._check_connection(conn)
-                except Exception:
-                    self._putconn(conn, from_getconn=True)
-                else:
-                    logger.info("connection given by %r", self.name)
-                    return conn
+            return self._getconn_with_check_loop(deadline)
         # Re-raise the timeout exception presenting the user the global
         # timeout, not the per-attempt one.
         except PoolTimeout:
@@ -213,6 +205,32 @@ class ConnectionPool(Generic[CT], BasePool):
                 f"couldn't get a connection after {timeout:.2f} sec"
             ) from None
 
+    def _getconn_with_check_loop(self, deadline: float) -> CT:
+        attempt: ConnectionAttempt | None = None
+
+        while True:
+            conn = self._getconn_unchecked(deadline - monotonic())
+            try:
+                self._check_connection(conn)
+            except Exception:
+                self._putconn(conn, from_getconn=True)
+            else:
+                logger.info("connection given by %r", self.name)
+                return conn
+
+            # Delay further checks to avoid a busy loop, using the same
+            # backoff policy used in reconnection attempts.
+            now = monotonic()
+            if not attempt:
+                attempt = ConnectionAttempt(reconnect_timeout=deadline - now)
+            else:
+                attempt.update_delay(now)
+
+            if attempt.time_to_give_up(now):
+                raise PoolTimeout()
+            else:
+                sleep(attempt.delay)
+
     def _getconn_unchecked(self, timeout: float) -> CT:
         # Critical section: decide here if there's a connection ready
         # or if the client needs to wait.
index 0a7e7e302293e031052c1f4eaf90d726427a4329..add6fa84ef18f09b7ce5f1b57d5a6129962a82cc 100644 (file)
@@ -25,7 +25,7 @@ from .base import ConnectionAttempt, BasePool
 from .errors import PoolClosed, PoolTimeout, TooManyRequests
 from ._compat import Deque, Self
 from ._acompat import ACondition, AEvent, ALock, AQueue, AWorker, aspawn, agather
-from ._acompat import current_task_name
+from ._acompat import asleep, current_task_name
 from .sched_async import AsyncScheduler
 
 if True:  # ASYNC
@@ -217,15 +217,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         self._check_open_getconn()
 
         try:
-            while True:
-                conn = await self._getconn_unchecked(deadline - monotonic())
-                try:
-                    await self._check_connection(conn)
-                except Exception:
-                    await self._putconn(conn, from_getconn=True)
-                else:
-                    logger.info("connection given by %r", self.name)
-                    return conn
+            return await self._getconn_with_check_loop(deadline)
 
         # Re-raise the timeout exception presenting the user the global
         # timeout, not the per-attempt one.
@@ -234,6 +226,32 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
                 f"couldn't get a connection after {timeout:.2f} sec"
             ) from None
 
+    async def _getconn_with_check_loop(self, deadline: float) -> ACT:
+        attempt: ConnectionAttempt | None = None
+
+        while True:
+            conn = await self._getconn_unchecked(deadline - monotonic())
+            try:
+                await self._check_connection(conn)
+            except Exception:
+                await self._putconn(conn, from_getconn=True)
+            else:
+                logger.info("connection given by %r", self.name)
+                return conn
+
+            # Delay further checks to avoid a busy loop, using the same
+            # backoff policy used in reconnection attempts.
+            now = monotonic()
+            if not attempt:
+                attempt = ConnectionAttempt(reconnect_timeout=deadline - now)
+            else:
+                attempt.update_delay(now)
+
+            if attempt.time_to_give_up(now):
+                raise PoolTimeout()
+            else:
+                await asleep(attempt.delay)
+
     async def _getconn_unchecked(self, timeout: float) -> ACT:
         # Critical section: decide here if there's a connection ready
         # or if the client needs to wait.
index 473fe17f545d2bad72d9fe3d6a9c4e8092baa2d9..9105957859b1eeb036ea6be25a125f84f414dbc7 100644 (file)
@@ -979,3 +979,34 @@ def test_cancellation_in_queue(dsn):
         with p.connection() as conn:
             cur = conn.execute("select 1")
             assert cur.fetchone() == (1,)
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+def test_check_backoff(dsn, caplog, monkeypatch):
+    caplog.set_level(logging.INFO, logger="psycopg.pool")
+
+    assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
+    assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1
+    monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
+    monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
+
+    def check(conn):
+        raise Exception()
+
+    caplog.clear()
+    with pool.ConnectionPool(dsn, min_size=1, check=check) as p:
+        p.wait(2.0)
+
+        with pytest.raises(pool.PoolTimeout):
+            with p.connection(timeout=1.0):
+                assert False
+
+    times = [rec.created for rec in caplog.records if "failed check" in rec.message]
+    assert times[1] - times[0] < 0.05
+    deltas = [times[i + 1] - times[i] for i in range(1, len(times) - 1)]
+    assert len(deltas) == 3
+    want = 0.1
+    for delta in deltas:
+        assert delta == pytest.approx(want, 0.05), deltas
+        want *= 2
index b192bf86236a3d2f7d3294351d37f38bd595c5c7..ccb9c4aa344090e83a2008700280819ebe3103cc 100644 (file)
@@ -986,3 +986,34 @@ async def test_cancellation_in_queue(dsn):
         async with p.connection() as conn:
             cur = await conn.execute("select 1")
             assert await cur.fetchone() == (1,)
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+async def test_check_backoff(dsn, caplog, monkeypatch):
+    caplog.set_level(logging.INFO, logger="psycopg.pool")
+
+    assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
+    assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1
+    monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
+    monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
+
+    async def check(conn):
+        raise Exception()
+
+    caplog.clear()
+    async with pool.AsyncConnectionPool(dsn, min_size=1, check=check) as p:
+        await p.wait(2.0)
+
+        with pytest.raises(pool.PoolTimeout):
+            async with p.connection(timeout=1.0):
+                assert False
+
+    times = [rec.created for rec in caplog.records if "failed check" in rec.message]
+    assert times[1] - times[0] < 0.05
+    deltas = [times[i + 1] - times[i] for i in range(1, len(times) - 1)]
+    assert len(deltas) == 3
+    want = 0.1
+    for delta in deltas:
+        assert delta == pytest.approx(want, 0.05), deltas
+        want *= 2