from . import errors as e
from .pq import TransactionStatus
+from ._sched import Scheduler
from .connection import Connection
WORKER_TIMEOUT = 60.0
name: Optional[str] = None,
timeout_sec: float = 30.0,
max_idle_sec: float = 10 * 60.0,
+ reconnect_timeout: float = 5 * 60.0,
+ reconnect_failed: Optional[Callable[["ConnectionPool"], None]] = None,
num_workers: int = 3,
):
if maxconn is None:
self.kwargs: Dict[str, Any] = kwargs or {}
self._configure: Callable[[Connection], None]
self._configure = configure or (lambda conn: None)
+ self._reconnect_failed: Callable[["ConnectionPool"], None]
+ self._reconnect_failed = reconnect_failed or (lambda pool: None)
self.name = name
self.minconn = minconn
self.maxconn = maxconn
self.timeout_sec = timeout_sec
+ self.reconnect_timeout = reconnect_timeout
self.max_idle_sec = max_idle_sec
self.num_workers = num_workers
self._nconns = minconn # currently in the pool, out, being prepared
self._pool: Deque[Tuple[Connection, float]] = deque()
self._waiting: Deque["WaitingClient"] = deque()
- self._lock = threading.Lock()
+ self._lock = threading.RLock()
self._closed = False
+ self.sched = Scheduler()
self._wqueue: "Queue[MaintenanceTask]" = Queue()
self._workers: List[threading.Thread] = []
t.start()
self._workers.append(t)
+ self._sched_runner = threading.Thread(target=self.sched.run)
+ self._sched_runner.daemon = True
+ self._sched_runner.start()
+
# Populate the pool with initial minconn connections
event = threading.Event()
for i in range(self._nconns):
# Extract the first client from the queue
pos = self._waiting.popleft()
else:
- now = time.time()
+ now = time.monotonic()
# No client waiting for a connection: put it back into the pool
self._pool.append((conn, now))
# Now that the flag _closed is set, getconn will fail immediately,
# putconn will just close the returned connection.
+ # Stop the scheduler
+ self.sched.enter(0, None)
+
# Signal to eventual clients in the queue that business is closed.
while self._waiting:
pos = self._waiting.popleft()
"""Configure a connection after creation."""
self._configure(conn)
+ def reconnect_failed(self) -> None:
+ """
+ Called when reconnection failed for longer than `reconnect_timeout`.
+ """
+ self._reconnect_failed(self)
+
class WaitingClient:
"""An position in a queue for a client waiting for a connection."""
pass
-class AddConnection(MaintenanceTask):
- """Add a new connection into to the pool."""
-
- def _run(self, pool: ConnectionPool) -> None:
- conn = pool._connect()
- pool._add_to_pool(conn)
-
-
-class AddInitialConnection(AddConnection):
+class AddInitialConnection(MaintenanceTask):
"""Add a new connection into to the pool.
If the desired number of connections is reached notify the event.
self.event = event
def _run(self, pool: ConnectionPool) -> None:
- super()._run(pool)
+ conn = pool._connect()
+ pool._add_to_pool(conn)
if len(pool._pool) >= pool._nconns:
self.event.set()
+class AddConnection(MaintenanceTask):
+ INITIAL_DELAY = 1.0
+ DELAY_JITTER = 0.1
+ DELAY_BACKOFF = 2.0
+
+ def __init__(self, pool: ConnectionPool):
+ super().__init__(pool)
+ self.delay = 0.0
+ self.give_up_at = 0.0
+
+ def _run(self, pool: ConnectionPool) -> None:
+ try:
+ conn = pool._connect()
+ except Exception as e:
+ logger.warning(f"error reconnecting in {pool.name!r}: {e}")
+ self._handle_error(pool)
+ else:
+ pool._add_to_pool(conn)
+
+ def _handle_error(self, pool: ConnectionPool) -> None:
+ """Called after a connection failure.
+
+ Calculate the new time for a new reconnection attempt and schedule a
+ retry in the future. If too many attempts were performed, give up, by
+ decreasing the pool connection number and calling
+ `pool.reconnect_failed()`.
+ """
+ now = time.monotonic()
+ if self.give_up_at and now >= self.give_up_at:
+ logger.warning(
+ "reconnection attempt in pool %r failed after %s sec",
+ pool.name,
+ pool.reconnect_timeout,
+ )
+ with pool._lock:
+ pool._nconns -= 1
+ pool.reconnect_failed()
+ return
+
+ # Calculate how long to wait for a new connection attempt
+ if self.delay == 0.0:
+ self.give_up_at = now + pool.reconnect_timeout
+ # +/- 10% of the initial delay
+ jitter = self.INITIAL_DELAY * (
+ (2.0 * self.DELAY_JITTER * random.random()) - self.DELAY_JITTER
+ )
+ self.delay = self.INITIAL_DELAY + jitter
+ else:
+ self.delay *= self.DELAY_BACKOFF
+
+ # Schedule a run of self.retry() some time in the future
+ if now + self.delay < self.give_up_at:
+ pool.sched.enter(self.delay, self.retry)
+ else:
+ pool.sched.enterabs(self.give_up_at, self.retry)
+
+ def retry(self) -> None:
+ pool = self.pool()
+ if not pool:
+ return
+
+ pool.add_task(self)
+
+
class ReturnConnection(MaintenanceTask):
"""Clean up and return a connection to the pool."""
--- /dev/null
+import time
+import socket
+import subprocess as sp
+from shutil import which
+
+import pytest
+
+from psycopg3 import conninfo
+
+
+@pytest.fixture
+def proxy(dsn):
+ """Return a proxy to the --test-dsn database"""
+ p = Proxy(dsn)
+ yield p
+ p.stop()
+
+
+class Proxy:
+ """
+ Proxy a Postgres service for testing purpose.
+
+ Allow to lose connectivity and restart it using stop/start.
+ """
+
+ def __init__(self, server_dsn):
+ cdict = conninfo.conninfo_to_dict(server_dsn)
+
+ # Get server params
+ self.server_port = cdict.get("port", "5432")
+ if "host" not in cdict or cdict["host"].startswith("/"):
+ self.server_host = "localhost"
+ else:
+ self.server_host = cdict["host"]
+
+ # Get client params
+ self.client_host = "localhost"
+ self.client_port = self._get_random_port()
+
+ # Make a connection string to the proxy
+ cdict["host"] = self.client_host
+ cdict["port"] = self.client_port
+ self.client_dsn = conninfo.make_conninfo(**cdict)
+
+ # The running proxy process
+ self.proc = None
+
+ def start(self):
+ if self.proc:
+ raise ValueError("proxy already running")
+
+ pproxy = which("pproxy")
+ if not pproxy:
+ raise ValueError("pproxy program not found")
+ cmdline = [pproxy, "--reuse"]
+ cmdline.extend(["-l", f"tunnel://:{self.client_port}"])
+ cmdline.extend(
+ ["-r", f"tunnel://{self.server_host}:{self.server_port}"]
+ )
+
+ self.proc = sp.Popen(cmdline, stdout=sp.DEVNULL)
+ self._wait_listen()
+
+ def stop(self):
+ if not self.proc:
+ return
+
+ self.proc.terminate()
+ self.proc.wait()
+ self.proc = None
+
+ @classmethod
+ def _get_random_port(cls):
+ with socket.socket() as s:
+ s.bind(("", 0))
+ return s.getsockname()[1]
+
+ def _wait_listen(self):
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+ for i in range(20):
+ if 0 == sock.connect_ex((self.client_host, self.client_port)):
+ return
+ time.sleep(0.1)
+ raise ValueError("the proxy didn't start")
import logging
import weakref
-from time import time, sleep
+from time import monotonic, sleep, time
from threading import Thread
import pytest
@pytest.mark.slow
def test_concurrent_filling(dsn, monkeypatch):
delay_connection(monkeypatch, 0.1)
- t0 = time()
+ t0 = monotonic()
p = pool.ConnectionPool(dsn, minconn=5, num_workers=2)
times = [item[1] - t0 for item in p._pool]
want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
assert t == pytest.approx(0.2, 0.1)
+@pytest.mark.slow
+def test_reconnect(proxy, caplog, monkeypatch):
+ caplog.set_level(logging.WARNING, logger="psycopg3.pool")
+
+ assert pool.AddConnection.INITIAL_DELAY == 1.0
+ assert pool.AddConnection.DELAY_JITTER == 0.1
+ monkeypatch.setattr(pool.AddConnection, "INITIAL_DELAY", 0.1)
+ monkeypatch.setattr(pool.AddConnection, "DELAY_JITTER", 0.0)
+
+ proxy.start()
+ p = pool.ConnectionPool(proxy.client_dsn, minconn=1, timeout_sec=2)
+ proxy.stop()
+
+ with pytest.raises(psycopg3.OperationalError):
+ with p.connection() as conn:
+ conn.execute("select 1")
+
+ sleep(1.0)
+ proxy.start()
+ wait_pool_full(p)
+
+ with p.connection() as conn:
+ conn.execute("select 1")
+
+ assert "BAD" in caplog.messages[0]
+ times = [rec.created for rec in caplog.records]
+ assert times[1] - times[0] < 0.05
+ deltas = [times[i + 1] - times[i] for i in range(1, len(times) - 1)]
+ assert len(deltas) == 3
+ want = 0.1
+ for delta in deltas:
+ assert delta == pytest.approx(want, 0.05), deltas
+ want *= 2
+
+
+@pytest.mark.slow
+def test_reconnect_failure(proxy):
+ proxy.start()
+
+ t1 = None
+
+ def failed(pool):
+ assert pool.name == "this-one"
+ nonlocal t1
+ t1 = time()
+
+ p = pool.ConnectionPool(
+ proxy.client_dsn,
+ name="this-one",
+ minconn=1,
+ timeout_sec=2,
+ reconnect_timeout=1.0,
+ reconnect_failed=failed,
+ )
+ proxy.stop()
+
+ with pytest.raises(psycopg3.OperationalError):
+ with p.connection() as conn:
+ conn.execute("select 1")
+
+ t0 = time()
+ sleep(1.5)
+ assert t1
+ assert t1 - t0 == pytest.approx(1.0, 0.1)
+ assert p._nconns == 0
+
+ proxy.start()
+ t0 = time()
+ with p.connection() as conn:
+ conn.execute("select 1")
+ t1 = time()
+ assert t1 - t0 < 0.2
+
+
def delay_connection(monkeypatch, sec):
"""
Return a _connect_gen function delayed by the amount of seconds