]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Add jitter method to pool
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 28 Feb 2021 02:55:29 +0000 (03:55 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 12 Mar 2021 04:07:25 +0000 (05:07 +0100)
psycopg3/psycopg3/pool/base.py
tests/pool/test_pool.py
tests/pool/test_pool_async.py

index c999ac11371364aec3a4cbafaf2ccb7409eb428a..c56b116c58e35e3aaf2e975aa5884f44a1db1d66 100644 (file)
@@ -4,10 +4,10 @@ psycopg3 connection pool base class and functionalities.
 
 # Copyright (C) 2021 The Psycopg Team
 
-import random
 import logging
 import threading
 from queue import Queue, Empty
+from random import random
 from typing import Any, Callable, Deque, Dict, Generic, List, Optional
 from collections import deque
 
@@ -164,7 +164,7 @@ class BasePool(Generic[ConnectionType]):
         StopWorker is received.
         """
         # Don't make all the workers time out at the same moment
-        timeout = WORKER_TIMEOUT * (0.9 + 0.1 * random.random())
+        timeout = cls._jitter(WORKER_TIMEOUT, -0.1, 0.1)
         while True:
             # Use a timeout to make the wait interruptable
             try:
@@ -187,6 +187,13 @@ class BasePool(Generic[ConnectionType]):
                     "task run %s failed: %s: %s", task, e.__class__.__name__, e
                 )
 
+    @classmethod
+    def _jitter(cls, value: float, min_pc: float, max_pc: float) -> float:
+        """
+        Add a random value to *value* between *min_pc* and *max_pc* percent.
+        """
+        return value * (1.0 + ((max_pc - min_pc) * random()) + min_pc)
+
 
 class ConnectionAttempt:
     """Keep the state of a connection attempt."""
@@ -204,11 +211,9 @@ class ConnectionAttempt:
         """Calculate how long to wait for a new connection attempt"""
         if self.delay == 0.0:
             self.give_up_at = now + self.reconnect_timeout
-            # +/- 10% of the initial delay
-            jitter = self.INITIAL_DELAY * (
-                (2.0 * self.DELAY_JITTER * random.random()) - self.DELAY_JITTER
+            self.delay = BasePool._jitter(
+                self.INITIAL_DELAY, -self.DELAY_JITTER, self.DELAY_JITTER
             )
-            self.delay = self.INITIAL_DELAY + jitter
         else:
             self.delay *= self.DELAY_BACKOFF
 
index 1356f8e08f2c80e43f26b820970d9b752f5b5c6b..49792724a47247708909827090dc99c9c94b0a19 100644 (file)
@@ -636,6 +636,13 @@ def test_resize(dsn):
     assert size == [2, 1, 3, 4, 3, 2, 2]
 
 
+def test_jitter():
+    rnds = [pool.ConnectionPool._jitter(30, -0.1, +0.2) for i in range(100)]
+    rnds.sort()
+    assert 27 <= min(rnds) <= 28
+    assert 35 < max(rnds) < 36
+
+
 def delay_connection(monkeypatch, sec):
     """
     Return a _connect_gen function delayed by the amount of seconds
index 0425947d3dec09cdb69189b52f8c35854ccd9780..7a19e036b314880fbad74d647cf939aa23096c38 100644 (file)
@@ -668,6 +668,15 @@ async def test_resize(dsn):
     assert size == [2, 1, 3, 4, 3, 2, 2]
 
 
+def test_jitter():
+    rnds = [
+        pool.AsyncConnectionPool._jitter(30, -0.1, +0.2) for i in range(100)
+    ]
+    rnds.sort()
+    assert 27 <= min(rnds) <= 28
+    assert 35 < max(rnds) < 36
+
+
 def delay_connection(monkeypatch, sec):
     """
     Return a _connect_gen function delayed by the amount of seconds