conn._expire_at = monotonic() + self._jitter(self.max_lifetime, -0.05, 0.0)
-class ConnectionAttempt:
- """Keep the state of a connection attempt."""
+class AttemptWithBackoff:
+ """
+ Keep the state of a repeated operation attempt with exponential backoff.
+ """
INITIAL_DELAY = 1.0
DELAY_JITTER = 0.1
DELAY_BACKOFF = 2.0
- def __init__(self, *, reconnect_timeout: float):
- self.reconnect_timeout = reconnect_timeout
+ def __init__(self, *, timeout: float):
+ self.timeout = timeout
self.delay = 0.0
self.give_up_at = 0.0
def update_delay(self, now: float) -> None:
"""Calculate how long to wait for a new connection attempt"""
if self.delay == 0.0:
- self.give_up_at = now + self.reconnect_timeout
+ self.give_up_at = now + self.timeout
self.delay = BasePool._jitter(
self.INITIAL_DELAY, -self.DELAY_JITTER, self.DELAY_JITTER
)
self.delay = max(0.0, self.give_up_at - now)
def time_to_give_up(self, now: float) -> bool:
- """Return True if we are tired of trying to connect. Meh."""
+ """Return True if we are tired of trying this attempt. Meh."""
return self.give_up_at > 0.0 and now >= self.give_up_at
from psycopg.pq import TransactionStatus
from .abc import CT, ConnectionCB, ConnectFailedCB
-from .base import ConnectionAttempt, BasePool
+from .base import AttemptWithBackoff, BasePool
from .errors import PoolClosed, PoolTimeout, TooManyRequests
from ._compat import Deque, Self
from ._acompat import Condition, Event, Lock, Queue, Worker, spawn, gather
) from None
def _getconn_with_check_loop(self, deadline: float) -> CT:
- attempt: ConnectionAttempt | None = None
+ attempt: AttemptWithBackoff | None = None
while True:
conn = self._getconn_unchecked(deadline - monotonic())
# backoff policy used in reconnection attempts.
now = monotonic()
if not attempt:
- attempt = ConnectionAttempt(reconnect_timeout=deadline - now)
+ attempt = AttemptWithBackoff(timeout=deadline - now)
else:
attempt.update_delay(now)
return conn
def _add_connection(
- self, attempt: Optional[ConnectionAttempt], growing: bool = False
+ self, attempt: Optional[AttemptWithBackoff], growing: bool = False
) -> None:
"""Try to connect and add the connection to the pool.
"""
now = monotonic()
if not attempt:
- attempt = ConnectionAttempt(reconnect_timeout=self.reconnect_timeout)
+ attempt = AttemptWithBackoff(timeout=self.reconnect_timeout)
try:
conn = self._connect()
def __init__(
self,
pool: ConnectionPool[Any],
- attempt: Optional[ConnectionAttempt] = None,
+ attempt: Optional[AttemptWithBackoff] = None,
growing: bool = False,
):
super().__init__(pool)
from psycopg.pq import TransactionStatus
from .abc import ACT, AsyncConnectionCB, AsyncConnectFailedCB
-from .base import ConnectionAttempt, BasePool
+from .base import AttemptWithBackoff, BasePool
from .errors import PoolClosed, PoolTimeout, TooManyRequests
from ._compat import Deque, Self
from ._acompat import ACondition, AEvent, ALock, AQueue, AWorker, aspawn, agather
) from None
async def _getconn_with_check_loop(self, deadline: float) -> ACT:
- attempt: ConnectionAttempt | None = None
+ attempt: AttemptWithBackoff | None = None
while True:
conn = await self._getconn_unchecked(deadline - monotonic())
# backoff policy used in reconnection attempts.
now = monotonic()
if not attempt:
- attempt = ConnectionAttempt(reconnect_timeout=deadline - now)
+ attempt = AttemptWithBackoff(timeout=deadline - now)
else:
attempt.update_delay(now)
return conn
async def _add_connection(
- self, attempt: Optional[ConnectionAttempt], growing: bool = False
+ self, attempt: Optional[AttemptWithBackoff], growing: bool = False
) -> None:
"""Try to connect and add the connection to the pool.
"""
now = monotonic()
if not attempt:
- attempt = ConnectionAttempt(reconnect_timeout=self.reconnect_timeout)
+ attempt = AttemptWithBackoff(timeout=self.reconnect_timeout)
try:
conn = await self._connect()
def __init__(
self,
pool: AsyncConnectionPool[Any],
- attempt: Optional[ConnectionAttempt] = None,
+ attempt: Optional[AttemptWithBackoff] = None,
growing: bool = False,
):
super().__init__(pool)
def test_reconnect(proxy, caplog, monkeypatch):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
- assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
- assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1
- monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
- monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
+ assert pool.base.AttemptWithBackoff.INITIAL_DELAY == 1.0
+ assert pool.base.AttemptWithBackoff.DELAY_JITTER == 0.1
+ monkeypatch.setattr(pool.base.AttemptWithBackoff, "INITIAL_DELAY", 0.1)
+ monkeypatch.setattr(pool.base.AttemptWithBackoff, "DELAY_JITTER", 0.0)
caplog.clear()
proxy.start()
def test_check_backoff(dsn, caplog, monkeypatch):
caplog.set_level(logging.INFO, logger="psycopg.pool")
- assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
- assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1
- monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
- monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
+ assert pool.base.AttemptWithBackoff.INITIAL_DELAY == 1.0
+ assert pool.base.AttemptWithBackoff.DELAY_JITTER == 0.1
+ monkeypatch.setattr(pool.base.AttemptWithBackoff, "INITIAL_DELAY", 0.1)
+ monkeypatch.setattr(pool.base.AttemptWithBackoff, "DELAY_JITTER", 0.0)
def check(conn):
raise Exception()
async def test_reconnect(proxy, caplog, monkeypatch):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
- assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
- assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1
- monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
- monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
+ assert pool.base.AttemptWithBackoff.INITIAL_DELAY == 1.0
+ assert pool.base.AttemptWithBackoff.DELAY_JITTER == 0.1
+ monkeypatch.setattr(pool.base.AttemptWithBackoff, "INITIAL_DELAY", 0.1)
+ monkeypatch.setattr(pool.base.AttemptWithBackoff, "DELAY_JITTER", 0.0)
caplog.clear()
proxy.start()
async def test_check_backoff(dsn, caplog, monkeypatch):
caplog.set_level(logging.INFO, logger="psycopg.pool")
- assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
- assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1
- monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
- monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
+ assert pool.base.AttemptWithBackoff.INITIAL_DELAY == 1.0
+ assert pool.base.AttemptWithBackoff.DELAY_JITTER == 0.1
+ monkeypatch.setattr(pool.base.AttemptWithBackoff, "INITIAL_DELAY", 0.1)
+ monkeypatch.setattr(pool.base.AttemptWithBackoff, "DELAY_JITTER", 0.0)
async def check(conn):
raise Exception()