# Copyright (C) 2021 The Psycopg Team
-import random
import logging
import threading
from queue import Queue, Empty
+from random import random
from typing import Any, Callable, Deque, Dict, Generic, List, Optional
from collections import deque
StopWorker is received.
"""
# Don't make all the workers time out at the same moment
- timeout = WORKER_TIMEOUT * (0.9 + 0.1 * random.random())
+ timeout = cls._jitter(WORKER_TIMEOUT, -0.1, 0.1)
while True:
# Use a timeout to make the wait interruptable
try:
"task run %s failed: %s: %s", task, e.__class__.__name__, e
)
+ @classmethod
+ def _jitter(cls, value: float, min_pc: float, max_pc: float) -> float:
+ """
+ Add a random value to *value* between *min_pc* and *max_pc* percent.
+ """
+ return value * (1.0 + ((max_pc - min_pc) * random()) + min_pc)
+
class ConnectionAttempt:
"""Keep the state of a connection attempt."""
"""Calculate how long to wait for a new connection attempt"""
if self.delay == 0.0:
self.give_up_at = now + self.reconnect_timeout
- # +/- 10% of the initial delay
- jitter = self.INITIAL_DELAY * (
- (2.0 * self.DELAY_JITTER * random.random()) - self.DELAY_JITTER
+ self.delay = BasePool._jitter(
+ self.INITIAL_DELAY, -self.DELAY_JITTER, self.DELAY_JITTER
)
- self.delay = self.INITIAL_DELAY + jitter
else:
self.delay *= self.DELAY_BACKOFF
assert size == [2, 1, 3, 4, 3, 2, 2]
+def test_jitter():
+ rnds = [pool.ConnectionPool._jitter(30, -0.1, +0.2) for i in range(100)]
+ rnds.sort()
+ assert 27 <= min(rnds) <= 28
+ assert 35 < max(rnds) < 36
+
+
def delay_connection(monkeypatch, sec):
"""
Return a _connect_gen function delayed by the amount of seconds
assert size == [2, 1, 3, 4, 3, 2, 2]
+def test_jitter():
+ rnds = [
+ pool.AsyncConnectionPool._jitter(30, -0.1, +0.2) for i in range(100)
+ ]
+ rnds.sort()
+ assert 27 <= min(rnds) <= 28
+ assert 35 < max(rnds) < 36
+
+
def delay_connection(monkeypatch, sec):
"""
Return a _connect_gen function delayed by the amount of seconds