psycopg_pool 3.2.1 (unreleased)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-- Respect timeout on `~ConnectionPool.connection()` when `!check` fails
- (:ticket:`#709`).
+- Respect the `!timeout` parameter on `~ConnectionPool.connection()` when
+ `!check` fails. Also avoid a busy-loop of checking; separate check attempts
+ using an exponential backoff (:ticket:`#709`).
- Use `typing.Self` as a more correct return value annotation of context
managers and other self-returning methods (see :ticket:`708`).
from __future__ import annotations
+import time
import queue
import asyncio
import logging
Condition = threading.Condition
Lock = threading.RLock
ALock = asyncio.Lock
+sleep = time.sleep
Worker: TypeAlias = threading.Thread
AWorker: TypeAlias = "asyncio.Task[None]"
if not t.is_alive():
continue
logger.warning("couldn't stop thread %r within %s seconds", t.name, timeout)
+
+
+def asleep(seconds: float) -> Coroutine[Any, Any, None]:
+ """
+ Equivalent to asyncio.sleep(), converted to time.sleep() by async_to_sync.
+ """
+ return asyncio.sleep(seconds)
from .errors import PoolClosed, PoolTimeout, TooManyRequests
from ._compat import Deque, Self
from ._acompat import Condition, Event, Lock, Queue, Worker, spawn, gather
-from ._acompat import current_thread_name
+from ._acompat import sleep, current_thread_name
from .sched import Scheduler
self._check_open_getconn()
try:
- while True:
- conn = self._getconn_unchecked(deadline - monotonic())
- try:
- self._check_connection(conn)
- except Exception:
- self._putconn(conn, from_getconn=True)
- else:
- logger.info("connection given by %r", self.name)
- return conn
+ return self._getconn_with_check_loop(deadline)
# Re-raise the timeout exception presenting the user the global
# timeout, not the per-attempt one.
except PoolTimeout:
f"couldn't get a connection after {timeout:.2f} sec"
) from None
+ def _getconn_with_check_loop(self, deadline: float) -> CT:
+ attempt: ConnectionAttempt | None = None
+
+ while True:
+ conn = self._getconn_unchecked(deadline - monotonic())
+ try:
+ self._check_connection(conn)
+ except Exception:
+ self._putconn(conn, from_getconn=True)
+ else:
+ logger.info("connection given by %r", self.name)
+ return conn
+
+ # Delay further checks to avoid a busy loop, using the same
+ # backoff policy used in reconnection attempts.
+ now = monotonic()
+ if not attempt:
+ attempt = ConnectionAttempt(reconnect_timeout=deadline - now)
+ else:
+ attempt.update_delay(now)
+
+ if attempt.time_to_give_up(now):
+ raise PoolTimeout()
+ else:
+ sleep(attempt.delay)
+
def _getconn_unchecked(self, timeout: float) -> CT:
# Critical section: decide here if there's a connection ready
# or if the client needs to wait.
from .errors import PoolClosed, PoolTimeout, TooManyRequests
from ._compat import Deque, Self
from ._acompat import ACondition, AEvent, ALock, AQueue, AWorker, aspawn, agather
-from ._acompat import current_task_name
+from ._acompat import asleep, current_task_name
from .sched_async import AsyncScheduler
if True: # ASYNC
self._check_open_getconn()
try:
- while True:
- conn = await self._getconn_unchecked(deadline - monotonic())
- try:
- await self._check_connection(conn)
- except Exception:
- await self._putconn(conn, from_getconn=True)
- else:
- logger.info("connection given by %r", self.name)
- return conn
+ return await self._getconn_with_check_loop(deadline)
# Re-raise the timeout exception presenting the user the global
# timeout, not the per-attempt one.
f"couldn't get a connection after {timeout:.2f} sec"
) from None
+ async def _getconn_with_check_loop(self, deadline: float) -> ACT:
+ attempt: ConnectionAttempt | None = None
+
+ while True:
+ conn = await self._getconn_unchecked(deadline - monotonic())
+ try:
+ await self._check_connection(conn)
+ except Exception:
+ await self._putconn(conn, from_getconn=True)
+ else:
+ logger.info("connection given by %r", self.name)
+ return conn
+
+ # Delay further checks to avoid a busy loop, using the same
+ # backoff policy used in reconnection attempts.
+ now = monotonic()
+ if not attempt:
+ attempt = ConnectionAttempt(reconnect_timeout=deadline - now)
+ else:
+ attempt.update_delay(now)
+
+ if attempt.time_to_give_up(now):
+ raise PoolTimeout()
+ else:
+ await asleep(attempt.delay)
+
async def _getconn_unchecked(self, timeout: float) -> ACT:
# Critical section: decide here if there's a connection ready
# or if the client needs to wait.
with p.connection() as conn:
cur = conn.execute("select 1")
assert cur.fetchone() == (1,)
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+def test_check_backoff(dsn, caplog, monkeypatch):
+ caplog.set_level(logging.INFO, logger="psycopg.pool")
+
+ assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
+ assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1
+ monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
+ monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
+
+ def check(conn):
+ raise Exception()
+
+ caplog.clear()
+ with pool.ConnectionPool(dsn, min_size=1, check=check) as p:
+ p.wait(2.0)
+
+ with pytest.raises(pool.PoolTimeout):
+ with p.connection(timeout=1.0):
+ assert False
+
+ times = [rec.created for rec in caplog.records if "failed check" in rec.message]
+ assert times[1] - times[0] < 0.05
+ deltas = [times[i + 1] - times[i] for i in range(1, len(times) - 1)]
+ assert len(deltas) == 3
+ want = 0.1
+ for delta in deltas:
+ assert delta == pytest.approx(want, 0.05), deltas
+ want *= 2
async with p.connection() as conn:
cur = await conn.execute("select 1")
assert await cur.fetchone() == (1,)
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+async def test_check_backoff(dsn, caplog, monkeypatch):
+ caplog.set_level(logging.INFO, logger="psycopg.pool")
+
+ assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
+ assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1
+ monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
+ monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
+
+ async def check(conn):
+ raise Exception()
+
+ caplog.clear()
+ async with pool.AsyncConnectionPool(dsn, min_size=1, check=check) as p:
+ await p.wait(2.0)
+
+ with pytest.raises(pool.PoolTimeout):
+ async with p.connection(timeout=1.0):
+ assert False
+
+ times = [rec.created for rec in caplog.records if "failed check" in rec.message]
+ assert times[1] - times[0] < 0.05
+ deltas = [times[i + 1] - times[i] for i in range(1, len(times) - 1)]
+ assert len(deltas) == 3
+ want = 0.1
+ for delta in deltas:
+ assert delta == pytest.approx(want, 0.05), deltas
+ want *= 2