maxconn: Optional[int] = None,
name: Optional[str] = None,
timeout_sec: float = 30.0,
+ max_idle_sec: float = 10 * 60.0,
num_workers: int = 1,
):
if maxconn is None:
self.minconn = minconn
self.maxconn = maxconn
self.timeout_sec = timeout_sec
+ self.max_idle_sec = max_idle_sec
self.num_workers = num_workers
- self._nconns = 0 # currently in the pool, out, being prepared
- self._pool: List[Tuple[Connection, float]] = []
+ self._nconns = minconn # currently in the pool, out, being prepared
+ self._pool: Deque[Tuple[Connection, float]] = deque()
self._waiting: Deque["WaitingClient"] = deque()
self._lock = threading.Lock()
self._closed = False
t.start()
self._workers.append(t)
- # Run a task to create the connections immediately
- self.add_task(TopUpConnections(self))
+ # Populate the pool with initial minconn connections
+ event = threading.Event()
+ for i in range(self._nconns):
+ self.add_task(AddInitialConnection(self, event))
+
+ # Wait for the pool to be full or throw an error
+ if not event.wait(timeout=timeout_sec):
+ self.close() # stop all the threads
+ raise PoolTimeout(
+ f"pool initialization incomplete after {timeout_sec} sec"
+ )
def __repr__(self) -> str:
return (
pos: Optional[WaitingClient] = None
if self._pool:
# Take a connection ready out of the pool
- conn = self._pool.pop(-1)[0]
+ conn = self._pool.pop()[0]
else:
# No connection available: put the client in the waiting queue
pos = WaitingClient()
self.add_task(AddConnection(self))
return
+ pos: Optional[WaitingClient] = None
+ to_close: Optional[Connection] = None
+
# Critical section: if there is a client waiting give it the connection
# otherwise put it back into the pool.
with self._lock:
- pos: Optional[WaitingClient] = None
if self._waiting:
# Extract the first client from the queue
pos = self._waiting.popleft()
else:
+ now = time.time()
+
# No client waiting for a connection: put it back into the pool
- self._pool.append((conn, time.time()))
+ self._pool.append((conn, now))
+
+ # Also check if it's time to shrink the pool
+ if (
+ self._nconns > self.minconn
+ and now - self._pool[0][1] > self.max_idle_sec
+ ):
+ to_close, t0 = self._pool.popleft()
+ logger.debug(
+ "shrinking pool %r after connection unused for %s sec",
+ self.name,
+ now - t0,
+ )
+ self._nconns -= 1
# If we found a client in queue, give it the connection and notify it
if pos:
pos.set(conn)
+ if to_close:
+ to_close.close()
+
def _reset_transaction_status(self, conn: Connection) -> None:
"""
Bring a connection to IDLE state or close it.
# Close the connections still in the pool
while self._pool:
- conn = self._pool.pop(-1)[0]
+ conn = self._pool.pop()[0]
conn.close()
# Stop the worker threads
pass
-class TopUpConnections(MaintenanceTask):
- """Increase the number of connections in the pool to the desired number."""
+class AddConnection(MaintenanceTask):
+ """Add a new connection into to the pool."""
def _run(self) -> None:
- with self.pool._lock:
- # Check if there are new connections to create. If there are
- # update the number of connections managed immediately and in
- # the same critical section to avoid finding more than owed
- nconns = self.pool._nconns
- if nconns < self.pool.minconn:
- newconns = self.pool.minconn - nconns
- self.pool._nconns += newconns
- else:
- return
+ conn = self.pool._connect()
+ self.pool._add_to_pool(conn)
- # enqueue connection creations command so that might be picked in
- # parallel if possible
- for i in range(newconns):
- self.pool.add_task(AddConnection(self.pool))
+class AddInitialConnection(AddConnection):
+ """Add a new connection into to the pool.
-class AddConnection(MaintenanceTask):
- """Add a new connection into to the pool."""
+ If the desired number of connections is reached notify the event.
+ """
+
+ def __init__(self, pool: ConnectionPool, event: threading.Event):
+ super().__init__(pool)
+ self.event = event
def _run(self) -> None:
- conn = self.pool._connect()
- self.pool._add_to_pool(conn)
+ super()._run()
+ if len(self.pool._pool) >= self.pool._nconns:
+ self.event.set()
class ReturnConnection(MaintenanceTask):
delay_connection(monkeypatch, 0.1)
t0 = time()
p = pool.ConnectionPool(dsn, minconn=5, num_workers=2)
- wait_pool_full(p)
times = [item[1] - t0 for item in p._pool]
want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
for got, want in zip(times, want_times):
assert got == pytest.approx(want, 0.1), times
+@pytest.mark.slow
+def test_init_timeout(dsn, monkeypatch):
+ delay_connection(monkeypatch, 0.1)
+ with pytest.raises(pool.PoolTimeout):
+ pool.ConnectionPool(dsn, minconn=4, num_workers=1, timeout_sec=0.3)
+
+ p = pool.ConnectionPool(dsn, minconn=4, num_workers=1, timeout_sec=0.5)
+ p.close()
+ p = pool.ConnectionPool(dsn, minconn=4, num_workers=2, timeout_sec=0.3)
+ p.close()
+
+
@pytest.mark.slow
def test_queue(dsn):
p = pool.ConnectionPool(dsn, minconn=2)
@pytest.mark.slow
def test_grow(dsn, monkeypatch):
p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, num_workers=3)
- wait_pool_full(p)
delay_connection(monkeypatch, 0.1)
ts = []
results = []
assert got == pytest.approx(want, 0.15), times
+def test_default_max_idle_sec(dsn):
+ p = pool.ConnectionPool(dsn)
+ assert p.max_idle_sec == 600
+
+
+@pytest.mark.slow
+def test_shrink(dsn, monkeypatch):
+ p = pool.ConnectionPool(
+ dsn, minconn=2, maxconn=4, num_workers=3, max_idle_sec=0.2
+ )
+ assert p.max_idle_sec == 0.2
+
+ def worker(n):
+ with p.connection() as conn:
+ conn.execute("select 1 from pg_sleep(0.2)")
+
+ ts = []
+ for i in range(4):
+ t = Thread(target=worker, args=(i,))
+ t.start()
+ ts.append(t)
+
+ for t in ts:
+ t.join()
+
+ wait_pool_full(p)
+ assert len(p._pool) == 4
+
+ t0 = time()
+ t = None
+ while time() < t0 + 0.4:
+ with p.connection():
+ sleep(0.01)
+ if p._nconns < 4:
+ t = time() - t0
+ break
+
+ assert t == pytest.approx(0.2, 0.1)
+
+
def delay_connection(monkeypatch, sec):
"""
Return a _connect_gen function delayed by the amount of seconds
def wait_pool_full(pool):
- while len(pool._pool) < pool.minconn:
+ while len(pool._pool) < pool._nconns:
sleep(0.01)