maxconn: Optional[int] = None,
name: Optional[str] = None,
timeout: float = 30.0,
- setup_timeout: float = 30.0,
max_idle: float = 10 * 60.0,
reconnect_timeout: float = 5 * 60.0,
reconnect_failed: Optional[Callable[["ConnectionPool"], None]] = None,
# max_idle interval they weren't all used.
self._nconns_min = minconn
+ # to notify that the pool is full
+ self._pool_full_event: Optional[threading.Event] = None
+
self._tasks: "Queue[tasks.MaintenanceTask]" = Queue()
self._workers: List[threading.Thread] = []
for i in range(num_workers):
for t in self._workers:
t.start()
- # Populate the pool with initial minconn connections
- # Block if setup_timeout is > 0, otherwise fill the pool in background
- if setup_timeout > 0:
- event = threading.Event()
- for i in range(self._nconns):
- self.run_task(tasks.AddInitialConnection(self, event))
-
- # Wait for the pool to be full or throw an error
- if not event.wait(timeout=setup_timeout):
- self.close() # stop all the threads
- raise PoolTimeout(
- f"pool initialization incomplete after {setup_timeout} sec"
- )
- else:
- for i in range(self._nconns):
- self.run_task(tasks.AddConnection(self))
+ # Populate the pool with initial minconn connections in background
+ for i in range(self._nconns):
+ self.run_task(tasks.AddConnection(self))
# Schedule a task to shrink the pool if connections over minconn have
# remained unused. However if the pool cannot't grow don't bother.
if hasattr(self, "_closed"):
self.close(timeout=0)
+ def wait_ready(self, timeout: float = 30.0) -> None:
+ """
+ Wait for the pool to be full after init.
+
+ Raise `PoolTimeout` if not ready within *timeout* sec.
+ """
+ with self._lock:
+ assert not self._pool_full_event
+ if len(self._pool) >= self._nconns:
+ return
+ self._pool_full_event = threading.Event()
+
+ if not self._pool_full_event.wait(timeout):
+ self.close() # stop all the threads
+ raise PoolTimeout(
+ f"pool initialization incomplete after {timeout} sec"
+ )
+
+ with self._lock:
+ self._pool_full_event = None
+
@contextmanager
def connection(
self, timeout: Optional[float] = None
conn._pool = self
return conn
- def _add_initial_connection(self, event: threading.Event) -> None:
- """Create a new connection at the beginning of the pool life.
-
- Trigger *event* if all the connections necessary have been added.
- """
- conn = self._connect()
- conn._pool = None # avoid a reference loop
-
- with self._lock:
- assert (
- not self._waiting
- ), "clients waiting in a pool being initialised"
- self._pool.append(conn)
- trigger_event = len(self._pool) >= self._nconns
-
- if trigger_event:
- event.set()
-
def _add_connection(self, attempt: Optional[ConnectionAttempt]) -> None:
"""Try to connect and add the connection to the pool.
# No client waiting for a connection: put it back into the pool
self._pool.append(conn)
+ # If we have been asked to wait for pool init, notify the
+ # waiter if the pool is full.
+ if self._pool_full_event and len(self._pool) >= self._nconns:
+ self._pool_full_event.set()
+
def _reset_connection(self, conn: Connection) -> None:
"""
Bring a connection to IDLE state or close it.
# Copyright (C) 2021 The Psycopg Team
import logging
-import threading
from abc import ABC, abstractmethod
from typing import Optional, TYPE_CHECKING
from weakref import ref
pass
-class AddInitialConnection(MaintenanceTask):
- """Add a new connection into to the pool.
-
- If the desired number of connections is reached notify the event.
- """
-
- def __init__(self, pool: "ConnectionPool", event: threading.Event):
- super().__init__(pool)
- self.event = event
-
- def _run(self, pool: "ConnectionPool") -> None:
- pool._add_initial_connection(self.event)
-
-
class AddConnection(MaintenanceTask):
def __init__(
self,
t0 = time()
times = []
- add_orig = pool.ConnectionPool._add_initial_connection
+ add_orig = pool.ConnectionPool._add_to_pool
- def add_time(self, event):
- add_orig(self, event)
+ def add_time(self, conn):
times.append(time() - t0)
+ add_orig(self, conn)
- monkeypatch.setattr(
- pool.ConnectionPool, "_add_initial_connection", add_time
- )
+ monkeypatch.setattr(pool.ConnectionPool, "_add_to_pool", add_time)
- pool.ConnectionPool(dsn, minconn=5, num_workers=2)
+ p = pool.ConnectionPool(dsn, minconn=5, num_workers=2)
+ p.wait_ready(5.0)
want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
assert len(times) == len(want_times)
for got, want in zip(times, want_times):
@pytest.mark.slow
-def test_setup_timeout(dsn, monkeypatch):
+def test_wait_ready(dsn, monkeypatch):
delay_connection(monkeypatch, 0.1)
with pytest.raises(pool.PoolTimeout):
- pool.ConnectionPool(dsn, minconn=4, num_workers=1, setup_timeout=0.3)
+ p = pool.ConnectionPool(dsn, minconn=4, num_workers=1)
+ p.wait_ready(0.3)
- p = pool.ConnectionPool(dsn, minconn=4, num_workers=1, setup_timeout=0.5)
+ p = pool.ConnectionPool(dsn, minconn=4, num_workers=1)
+ p.wait_ready(0.5)
p.close()
- p = pool.ConnectionPool(dsn, minconn=4, num_workers=2, setup_timeout=0.3)
+ p = pool.ConnectionPool(dsn, minconn=4, num_workers=2)
+ p.wait_ready(0.3)
+ p.wait_ready(0.0001) # idempotent
p.close()
@pytest.mark.slow
def test_setup_no_timeout(dsn, proxy):
with pytest.raises(pool.PoolTimeout):
- pool.ConnectionPool(
- proxy.client_dsn, minconn=1, num_workers=1, setup_timeout=0.2
- )
+ p = pool.ConnectionPool(proxy.client_dsn, minconn=1, num_workers=1)
+ p.wait_ready(0.2)
- p = pool.ConnectionPool(
- proxy.client_dsn, minconn=1, num_workers=1, setup_timeout=0
- )
+ p = pool.ConnectionPool(proxy.client_dsn, minconn=1, num_workers=1)
sleep(0.5)
assert not p._pool
proxy.start()
with p.connection() as conn:
conn.execute("select 1")
- wait_pool_full(p)
-
+ p.wait_ready()
ref = weakref.ref(p)
del p
assert not ref()
@pytest.mark.slow
def test_grow(dsn, monkeypatch):
p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, num_workers=3)
+ p.wait_ready(5.0)
delay_connection(monkeypatch, 0.1)
ts = []
results = []
monkeypatch.setattr(ShrinkPool, "_run", run_hacked)
p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, max_idle=0.2)
+ p.wait_ready(5.0)
assert p.max_idle == 0.2
def worker(n):
monkeypatch.setattr(pool.pool.ConnectionAttempt, "DELAY_JITTER", 0.0)
proxy.start()
- p = pool.ConnectionPool(proxy.client_dsn, minconn=1, setup_timeout=2.0)
+ p = pool.ConnectionPool(proxy.client_dsn, minconn=1)
+ p.wait_ready(2.0)
proxy.stop()
with pytest.raises(psycopg3.OperationalError):
sleep(1.0)
proxy.start()
- wait_pool_full(p)
+ p.wait_ready()
with p.connection() as conn:
conn.execute("select 1")
proxy.client_dsn,
name="this-one",
minconn=1,
- setup_timeout=2.0,
reconnect_timeout=1.0,
reconnect_failed=failed,
)
+ p.wait_ready(2.0)
proxy.stop()
with pytest.raises(psycopg3.OperationalError):
return rv
monkeypatch.setattr(psycopg3.Connection, "connect", connect_delay)
-
-
-def wait_pool_full(pool):
- while len(pool._pool) < pool._nconns:
- sleep(0.01)