From: Daniele Varrazzo Date: Fri, 29 Dec 2023 23:49:28 +0000 (+0100) Subject: refactor(pool): rename ConnectionAttempt as AttemptWithBackoff X-Git-Tag: pool-3.2.1~6^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1bf687e0bdbe5de3de8c805436a28ced5a2f530f;p=thirdparty%2Fpsycopg.git refactor(pool): rename ConnectionAttempt as AttemptWithBackoff This class implements the backoff + jitter policy and is now used to repeat checks too. --- diff --git a/psycopg_pool/psycopg_pool/base.py b/psycopg_pool/psycopg_pool/base.py index 1ed6ab1c9..5b0c83991 100644 --- a/psycopg_pool/psycopg_pool/base.py +++ b/psycopg_pool/psycopg_pool/base.py @@ -200,22 +200,24 @@ class BasePool: conn._expire_at = monotonic() + self._jitter(self.max_lifetime, -0.05, 0.0) -class ConnectionAttempt: - """Keep the state of a connection attempt.""" +class AttemptWithBackoff: + """ + Keep the state of a repeated operation attempt with exponential backoff. + """ INITIAL_DELAY = 1.0 DELAY_JITTER = 0.1 DELAY_BACKOFF = 2.0 - def __init__(self, *, reconnect_timeout: float): - self.reconnect_timeout = reconnect_timeout + def __init__(self, *, timeout: float): + self.timeout = timeout self.delay = 0.0 self.give_up_at = 0.0 def update_delay(self, now: float) -> None: """Calculate how long to wait for a new connection attempt""" if self.delay == 0.0: - self.give_up_at = now + self.reconnect_timeout + self.give_up_at = now + self.timeout self.delay = BasePool._jitter( self.INITIAL_DELAY, -self.DELAY_JITTER, self.DELAY_JITTER ) @@ -226,5 +228,5 @@ class ConnectionAttempt: self.delay = max(0.0, self.give_up_at - now) def time_to_give_up(self, now: float) -> bool: - """Return True if we are tired of trying to connect. Meh.""" + """Return True if we are tired of trying this attempt. Meh.""" return self.give_up_at > 0.0 and now >= self.give_up_at diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py index 90680d16f..7ab999b1e 100644 --- a/psycopg_pool/psycopg_pool/pool.py +++ b/psycopg_pool/psycopg_pool/pool.py @@ -24,7 +24,7 @@ from psycopg import Connection from psycopg.pq import TransactionStatus from .abc import CT, ConnectionCB, ConnectFailedCB -from .base import ConnectionAttempt, BasePool +from .base import AttemptWithBackoff, BasePool from .errors import PoolClosed, PoolTimeout, TooManyRequests from ._compat import Deque, Self from ._acompat import Condition, Event, Lock, Queue, Worker, spawn, gather @@ -206,7 +206,7 @@ class ConnectionPool(Generic[CT], BasePool): ) from None def _getconn_with_check_loop(self, deadline: float) -> CT: - attempt: ConnectionAttempt | None = None + attempt: AttemptWithBackoff | None = None while True: conn = self._getconn_unchecked(deadline - monotonic()) @@ -222,7 +222,7 @@ class ConnectionPool(Generic[CT], BasePool): # backoff policy used in reconnection attempts. now = monotonic() if not attempt: - attempt = ConnectionAttempt(reconnect_timeout=deadline - now) + attempt = AttemptWithBackoff(timeout=deadline - now) else: attempt.update_delay(now) @@ -622,7 +622,7 @@ class ConnectionPool(Generic[CT], BasePool): return conn def _add_connection( - self, attempt: Optional[ConnectionAttempt], growing: bool = False + self, attempt: Optional[AttemptWithBackoff], growing: bool = False ) -> None: """Try to connect and add the connection to the pool. @@ -633,7 +633,7 @@ class ConnectionPool(Generic[CT], BasePool): """ now = monotonic() if not attempt: - attempt = ConnectionAttempt(reconnect_timeout=self.reconnect_timeout) + attempt = AttemptWithBackoff(timeout=self.reconnect_timeout) try: conn = self._connect() @@ -928,7 +928,7 @@ class AddConnection(MaintenanceTask): def __init__( self, pool: ConnectionPool[Any], - attempt: Optional[ConnectionAttempt] = None, + attempt: Optional[AttemptWithBackoff] = None, growing: bool = False, ): super().__init__(pool) diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index add6fa84e..fd8f63782 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -21,7 +21,7 @@ from psycopg import AsyncConnection from psycopg.pq import TransactionStatus from .abc import ACT, AsyncConnectionCB, AsyncConnectFailedCB -from .base import ConnectionAttempt, BasePool +from .base import AttemptWithBackoff, BasePool from .errors import PoolClosed, PoolTimeout, TooManyRequests from ._compat import Deque, Self from ._acompat import ACondition, AEvent, ALock, AQueue, AWorker, aspawn, agather @@ -227,7 +227,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): ) from None async def _getconn_with_check_loop(self, deadline: float) -> ACT: - attempt: ConnectionAttempt | None = None + attempt: AttemptWithBackoff | None = None while True: conn = await self._getconn_unchecked(deadline - monotonic()) @@ -243,7 +243,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): # backoff policy used in reconnection attempts. now = monotonic() if not attempt: - attempt = ConnectionAttempt(reconnect_timeout=deadline - now) + attempt = AttemptWithBackoff(timeout=deadline - now) else: attempt.update_delay(now) @@ -666,7 +666,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): return conn async def _add_connection( - self, attempt: Optional[ConnectionAttempt], growing: bool = False + self, attempt: Optional[AttemptWithBackoff], growing: bool = False ) -> None: """Try to connect and add the connection to the pool. @@ -677,7 +677,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): """ now = monotonic() if not attempt: - attempt = ConnectionAttempt(reconnect_timeout=self.reconnect_timeout) + attempt = AttemptWithBackoff(timeout=self.reconnect_timeout) try: conn = await self._connect() @@ -977,7 +977,7 @@ class AddConnection(MaintenanceTask): def __init__( self, pool: AsyncConnectionPool[Any], - attempt: Optional[ConnectionAttempt] = None, + attempt: Optional[AttemptWithBackoff] = None, growing: bool = False, ): super().__init__(pool) diff --git a/tests/pool/test_pool.py b/tests/pool/test_pool.py index 910595785..59b0659c9 100644 --- a/tests/pool/test_pool.py +++ b/tests/pool/test_pool.py @@ -475,10 +475,10 @@ def test_shrink(dsn, monkeypatch): def test_reconnect(proxy, caplog, monkeypatch): caplog.set_level(logging.WARNING, logger="psycopg.pool") - assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0 - assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1 - monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1) - monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0) + assert pool.base.AttemptWithBackoff.INITIAL_DELAY == 1.0 + assert pool.base.AttemptWithBackoff.DELAY_JITTER == 0.1 + monkeypatch.setattr(pool.base.AttemptWithBackoff, "INITIAL_DELAY", 0.1) + monkeypatch.setattr(pool.base.AttemptWithBackoff, "DELAY_JITTER", 0.0) caplog.clear() proxy.start() @@ -986,10 +986,10 @@ def test_cancellation_in_queue(dsn): def test_check_backoff(dsn, caplog, monkeypatch): caplog.set_level(logging.INFO, logger="psycopg.pool") - assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0 - assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1 - monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1) - monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0) + assert pool.base.AttemptWithBackoff.INITIAL_DELAY == 1.0 + assert pool.base.AttemptWithBackoff.DELAY_JITTER == 0.1 + monkeypatch.setattr(pool.base.AttemptWithBackoff, "INITIAL_DELAY", 0.1) + monkeypatch.setattr(pool.base.AttemptWithBackoff, "DELAY_JITTER", 0.0) def check(conn): raise Exception() diff --git a/tests/pool/test_pool_async.py b/tests/pool/test_pool_async.py index ccb9c4aa3..ca9b29cef 100644 --- a/tests/pool/test_pool_async.py +++ b/tests/pool/test_pool_async.py @@ -481,10 +481,10 @@ async def test_shrink(dsn, monkeypatch): async def test_reconnect(proxy, caplog, monkeypatch): caplog.set_level(logging.WARNING, logger="psycopg.pool") - assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0 - assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1 - monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1) - monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0) + assert pool.base.AttemptWithBackoff.INITIAL_DELAY == 1.0 + assert pool.base.AttemptWithBackoff.DELAY_JITTER == 0.1 + monkeypatch.setattr(pool.base.AttemptWithBackoff, "INITIAL_DELAY", 0.1) + monkeypatch.setattr(pool.base.AttemptWithBackoff, "DELAY_JITTER", 0.0) caplog.clear() proxy.start() @@ -993,10 +993,10 @@ async def test_cancellation_in_queue(dsn): async def test_check_backoff(dsn, caplog, monkeypatch): caplog.set_level(logging.INFO, logger="psycopg.pool") - assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0 - assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1 - monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1) - monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0) + assert pool.base.AttemptWithBackoff.INITIAL_DELAY == 1.0 + assert pool.base.AttemptWithBackoff.DELAY_JITTER == 0.1 + monkeypatch.setattr(pool.base.AttemptWithBackoff, "INITIAL_DELAY", 0.1) + monkeypatch.setattr(pool.base.AttemptWithBackoff, "DELAY_JITTER", 0.0) async def check(conn): raise Exception()