]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
refactor(pool): rename ConnectionAttempt as AttemptWithBackoff 711/head
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 29 Dec 2023 23:49:28 +0000 (00:49 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 6 Jan 2024 12:04:47 +0000 (13:04 +0100)
This class implements the backoff + jitter policy and is now used to
repeat checks too.

psycopg_pool/psycopg_pool/base.py
psycopg_pool/psycopg_pool/pool.py
psycopg_pool/psycopg_pool/pool_async.py
tests/pool/test_pool.py
tests/pool/test_pool_async.py

index 1ed6ab1c92995b1cf780399a7f9cf9d26b5412f6..5b0c839918a6d3df01d61ab559ad9195f9993689 100644 (file)
@@ -200,22 +200,24 @@ class BasePool:
         conn._expire_at = monotonic() + self._jitter(self.max_lifetime, -0.05, 0.0)
 
 
-class ConnectionAttempt:
-    """Keep the state of a connection attempt."""
+class AttemptWithBackoff:
+    """
+    Keep the state of a repeated operation attempt with exponential backoff.
+    """
 
     INITIAL_DELAY = 1.0
     DELAY_JITTER = 0.1
     DELAY_BACKOFF = 2.0
 
-    def __init__(self, *, reconnect_timeout: float):
-        self.reconnect_timeout = reconnect_timeout
+    def __init__(self, *, timeout: float):
+        self.timeout = timeout
         self.delay = 0.0
         self.give_up_at = 0.0
 
     def update_delay(self, now: float) -> None:
         """Calculate how long to wait for a new connection attempt"""
         if self.delay == 0.0:
-            self.give_up_at = now + self.reconnect_timeout
+            self.give_up_at = now + self.timeout
             self.delay = BasePool._jitter(
                 self.INITIAL_DELAY, -self.DELAY_JITTER, self.DELAY_JITTER
             )
@@ -226,5 +228,5 @@ class ConnectionAttempt:
             self.delay = max(0.0, self.give_up_at - now)
 
     def time_to_give_up(self, now: float) -> bool:
-        """Return True if we are tired of trying to connect. Meh."""
+        """Return True if we are tired of trying this attempt. Meh."""
         return self.give_up_at > 0.0 and now >= self.give_up_at
index 90680d16f9ffeb377e0f342edadc40223c40931d..7ab999b1eaa6c1b764c1ee7fa0f0a9179c3fde5e 100644 (file)
@@ -24,7 +24,7 @@ from psycopg import Connection
 from psycopg.pq import TransactionStatus
 
 from .abc import CT, ConnectionCB, ConnectFailedCB
-from .base import ConnectionAttempt, BasePool
+from .base import AttemptWithBackoff, BasePool
 from .errors import PoolClosed, PoolTimeout, TooManyRequests
 from ._compat import Deque, Self
 from ._acompat import Condition, Event, Lock, Queue, Worker, spawn, gather
@@ -206,7 +206,7 @@ class ConnectionPool(Generic[CT], BasePool):
             ) from None
 
     def _getconn_with_check_loop(self, deadline: float) -> CT:
-        attempt: ConnectionAttempt | None = None
+        attempt: AttemptWithBackoff | None = None
 
         while True:
             conn = self._getconn_unchecked(deadline - monotonic())
@@ -222,7 +222,7 @@ class ConnectionPool(Generic[CT], BasePool):
             # backoff policy used in reconnection attempts.
             now = monotonic()
             if not attempt:
-                attempt = ConnectionAttempt(reconnect_timeout=deadline - now)
+                attempt = AttemptWithBackoff(timeout=deadline - now)
             else:
                 attempt.update_delay(now)
 
@@ -622,7 +622,7 @@ class ConnectionPool(Generic[CT], BasePool):
         return conn
 
     def _add_connection(
-        self, attempt: Optional[ConnectionAttempt], growing: bool = False
+        self, attempt: Optional[AttemptWithBackoff], growing: bool = False
     ) -> None:
         """Try to connect and add the connection to the pool.
 
@@ -633,7 +633,7 @@ class ConnectionPool(Generic[CT], BasePool):
         """
         now = monotonic()
         if not attempt:
-            attempt = ConnectionAttempt(reconnect_timeout=self.reconnect_timeout)
+            attempt = AttemptWithBackoff(timeout=self.reconnect_timeout)
 
         try:
             conn = self._connect()
@@ -928,7 +928,7 @@ class AddConnection(MaintenanceTask):
     def __init__(
         self,
         pool: ConnectionPool[Any],
-        attempt: Optional[ConnectionAttempt] = None,
+        attempt: Optional[AttemptWithBackoff] = None,
         growing: bool = False,
     ):
         super().__init__(pool)
index add6fa84ef18f09b7ce5f1b57d5a6129962a82cc..fd8f63782bb6206a1c2171943a8703623bfa9cc8 100644 (file)
@@ -21,7 +21,7 @@ from psycopg import AsyncConnection
 from psycopg.pq import TransactionStatus
 
 from .abc import ACT, AsyncConnectionCB, AsyncConnectFailedCB
-from .base import ConnectionAttempt, BasePool
+from .base import AttemptWithBackoff, BasePool
 from .errors import PoolClosed, PoolTimeout, TooManyRequests
 from ._compat import Deque, Self
 from ._acompat import ACondition, AEvent, ALock, AQueue, AWorker, aspawn, agather
@@ -227,7 +227,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             ) from None
 
     async def _getconn_with_check_loop(self, deadline: float) -> ACT:
-        attempt: ConnectionAttempt | None = None
+        attempt: AttemptWithBackoff | None = None
 
         while True:
             conn = await self._getconn_unchecked(deadline - monotonic())
@@ -243,7 +243,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             # backoff policy used in reconnection attempts.
             now = monotonic()
             if not attempt:
-                attempt = ConnectionAttempt(reconnect_timeout=deadline - now)
+                attempt = AttemptWithBackoff(timeout=deadline - now)
             else:
                 attempt.update_delay(now)
 
@@ -666,7 +666,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         return conn
 
     async def _add_connection(
-        self, attempt: Optional[ConnectionAttempt], growing: bool = False
+        self, attempt: Optional[AttemptWithBackoff], growing: bool = False
     ) -> None:
         """Try to connect and add the connection to the pool.
 
@@ -677,7 +677,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         """
         now = monotonic()
         if not attempt:
-            attempt = ConnectionAttempt(reconnect_timeout=self.reconnect_timeout)
+            attempt = AttemptWithBackoff(timeout=self.reconnect_timeout)
 
         try:
             conn = await self._connect()
@@ -977,7 +977,7 @@ class AddConnection(MaintenanceTask):
     def __init__(
         self,
         pool: AsyncConnectionPool[Any],
-        attempt: Optional[ConnectionAttempt] = None,
+        attempt: Optional[AttemptWithBackoff] = None,
         growing: bool = False,
     ):
         super().__init__(pool)
index 9105957859b1eeb036ea6be25a125f84f414dbc7..59b0659c91dad66b5a53416fa7b6dd1c28cd3224 100644 (file)
@@ -475,10 +475,10 @@ def test_shrink(dsn, monkeypatch):
 def test_reconnect(proxy, caplog, monkeypatch):
     caplog.set_level(logging.WARNING, logger="psycopg.pool")
 
-    assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
-    assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1
-    monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
-    monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
+    assert pool.base.AttemptWithBackoff.INITIAL_DELAY == 1.0
+    assert pool.base.AttemptWithBackoff.DELAY_JITTER == 0.1
+    monkeypatch.setattr(pool.base.AttemptWithBackoff, "INITIAL_DELAY", 0.1)
+    monkeypatch.setattr(pool.base.AttemptWithBackoff, "DELAY_JITTER", 0.0)
 
     caplog.clear()
     proxy.start()
@@ -986,10 +986,10 @@ def test_cancellation_in_queue(dsn):
 def test_check_backoff(dsn, caplog, monkeypatch):
     caplog.set_level(logging.INFO, logger="psycopg.pool")
 
-    assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
-    assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1
-    monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
-    monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
+    assert pool.base.AttemptWithBackoff.INITIAL_DELAY == 1.0
+    assert pool.base.AttemptWithBackoff.DELAY_JITTER == 0.1
+    monkeypatch.setattr(pool.base.AttemptWithBackoff, "INITIAL_DELAY", 0.1)
+    monkeypatch.setattr(pool.base.AttemptWithBackoff, "DELAY_JITTER", 0.0)
 
     def check(conn):
         raise Exception()
index ccb9c4aa344090e83a2008700280819ebe3103cc..ca9b29cef45674749ed950b9a4899008d33f3d7e 100644 (file)
@@ -481,10 +481,10 @@ async def test_shrink(dsn, monkeypatch):
 async def test_reconnect(proxy, caplog, monkeypatch):
     caplog.set_level(logging.WARNING, logger="psycopg.pool")
 
-    assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
-    assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1
-    monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
-    monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
+    assert pool.base.AttemptWithBackoff.INITIAL_DELAY == 1.0
+    assert pool.base.AttemptWithBackoff.DELAY_JITTER == 0.1
+    monkeypatch.setattr(pool.base.AttemptWithBackoff, "INITIAL_DELAY", 0.1)
+    monkeypatch.setattr(pool.base.AttemptWithBackoff, "DELAY_JITTER", 0.0)
 
     caplog.clear()
     proxy.start()
@@ -993,10 +993,10 @@ async def test_cancellation_in_queue(dsn):
 async def test_check_backoff(dsn, caplog, monkeypatch):
     caplog.set_level(logging.INFO, logger="psycopg.pool")
 
-    assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
-    assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1
-    monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
-    monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
+    assert pool.base.AttemptWithBackoff.INITIAL_DELAY == 1.0
+    assert pool.base.AttemptWithBackoff.DELAY_JITTER == 0.1
+    monkeypatch.setattr(pool.base.AttemptWithBackoff, "INITIAL_DELAY", 0.1)
+    monkeypatch.setattr(pool.base.AttemptWithBackoff, "DELAY_JITTER", 0.0)
 
     async def check(conn):
         raise Exception()