import threading
from abc import ABC, abstractmethod
from queue import Queue, Empty
-from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Tuple
+from typing import Any, Callable, Deque, Dict, Iterator, List, Optional
from weakref import ref
from contextlib import contextmanager
from collections import deque
self.num_workers = num_workers
self._nconns = minconn # currently in the pool, out, being prepared
- self._pool: Deque[Tuple[Connection, float]] = deque()
+ self._pool: Deque[Connection] = deque()
self._waiting: Deque["WaitingClient"] = deque()
self._lock = threading.RLock()
self._sched = Scheduler()
+ # Min number of connections in the pool in a max_idle unit of time.
+ # It is reset periodically by the ShrinkPool scheduled task.
+ # It is used to shrink back the pool if maxcon > minconn and extra
+ # connections have been acquired, if we notice that in the last
+ # max_idle interval they weren't all used.
+ self._nconns_min = minconn
+
self._wqueue: "Queue[MaintenanceTask]" = Queue()
self._workers: List[threading.Thread] = []
for i in range(num_workers):
for i in range(self._nconns):
self.run_task(AddConnection(self))
+ # Schedule a task to shrink the pool if connections over minconn have
+ # remained unused. However if the pool cannot't grow don't bother.
+ if maxconn > minconn:
+ self.schedule_task(ShrinkPool(self), self.max_idle)
+
def __repr__(self) -> str:
return (
f"<{self.__class__.__module__}.{self.__class__.__name__}"
failing to do so will deplete the pool. A depleted pool is a sad pool:
you don't want a depleted pool.
"""
- logger.debug("connection requested to %r", self.name)
+ logger.info("connection requested to %r", self.name)
# Critical section: decide here if there's a connection ready
# or if the client needs to wait.
with self._lock:
pos: Optional[WaitingClient] = None
if self._pool:
# Take a connection ready out of the pool
- conn = self._pool.pop()[0]
+ conn = self._pool.popleft()
+ if len(self._pool) < self._nconns_min:
+ self._nconns_min = len(self._pool)
else:
# No connection available: put the client in the waiting queue
pos = WaitingClient()
# If there is space for the pool to grow, let's do it
if self._nconns < self.maxconn:
- logger.debug("growing pool %r", self.name)
self._nconns += 1
+ logger.info(
+ "growing pool %r to %s", self.name, self._nconns
+ )
self.run_task(AddConnection(self))
# If we are in the waiting queue, wait to be assigned a connection
# Note that this property shouldn't be set while the connection is in
# the pool, to avoid to create a reference loop.
conn._pool = self
- logger.debug("connection given by %r", self.name)
+ logger.info("connection given by %r", self.name)
return conn
def putconn(self, conn: Connection) -> None:
f"can't return connection to pool {self.name!r}, {msg}: {conn}"
)
- logger.debug("returning connection to %r", self.name)
+ logger.info("returning connection to %r", self.name)
# If the pool is closed just close the connection instead of returning
# it to the pool. For extra refcare remove the pool reference from it.
break
else:
- now = time.monotonic()
-
# No client waiting for a connection: put it back into the pool
- self._pool.append((conn, now))
-
- # Also check if it's time to shrink the pool
- if (
- self._nconns > self.minconn
- and now - self._pool[0][1] > self.max_idle
- ):
- to_close, t0 = self._pool.popleft()
- logger.debug(
- "shrinking pool %r after connection unused for %s sec",
- self.name,
- now - t0,
- )
- self._nconns -= 1
+ self._pool.append(conn)
if to_close:
to_close.close()
pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
# Close the connections still in the pool
- for conn, _ in pool:
+ for conn in pool:
conn.close()
# Wait for the worker threads to terminate
class WaitingClient:
- """An position in a queue for a client waiting for a connection."""
+ """A position in a queue for a client waiting for a connection."""
__slots__ = ("conn", "error", "_cond")
"""Signal the client waiting that a connection is ready.
Return True if the client has "accepted" the connection, False
- otherwise (typically because wait() has timed out.
+ otherwise (typically because wait() has timed out).
"""
with self._cond:
if self.conn or self.error:
"""Signal the client that, alas, they won't have a connection today.
Return True if the client has "accepted" the error, False otherwise
- (typically because wait() has timed out.
+ (typically because wait() has timed out).
"""
with self._cond:
if self.conn or self.error:
def _run(self, pool: ConnectionPool) -> None:
pool._add_to_pool(self.conn)
+
+
+class ShrinkPool(MaintenanceTask):
+ """If the pool can shrink, remove one connection.
+
+ Re-schedule periodically and also reset the minimum number of connections
+ in the pool.
+ """
+
+ def _run(self, pool: ConnectionPool) -> None:
+ # Reschedule the task now so that in case of any error we don't lose
+ # the periodic run.
+ pool.schedule_task(self, pool.max_idle)
+
+ to_close: Optional[Connection] = None
+
+ with pool._lock:
+ # Reset the min number of connections used
+ nconns_min = pool._nconns_min
+ pool._nconns_min = len(pool._pool)
+
+ # If the pool can shrink and connections were unused, drop one
+ if pool._nconns > pool.minconn and nconns_min > 0:
+ to_close = pool._pool.popleft()
+ pool._nconns -= 1
+
+ if to_close:
+ logger.info(
+ "shrinking pool %r to %s because %s unused connections"
+ " in the last %s sec",
+ pool.name,
+ pool._nconns,
+ nconns_min,
+ pool.max_idle,
+ )
+ to_close.close()
import logging
import weakref
-from time import monotonic, sleep, time
+from time import sleep, time
from threading import Thread
+from collections import Counter
import pytest
@pytest.mark.slow
def test_concurrent_filling(dsn, monkeypatch):
delay_connection(monkeypatch, 0.1)
- t0 = monotonic()
- p = pool.ConnectionPool(dsn, minconn=5, num_workers=2)
- times = [item[1] - t0 for item in p._pool]
+ t0 = time()
+ times = []
+
+ add_to_pool_orig = pool.ConnectionPool._add_to_pool
+
+ def _add_to_pool_time(self, conn):
+ times.append(time() - t0)
+ add_to_pool_orig(self, conn)
+
+ monkeypatch.setattr(pool.ConnectionPool, "_add_to_pool", _add_to_pool_time)
+
+ pool.ConnectionPool(dsn, minconn=5, num_workers=2)
want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
+ assert len(times) == len(want_times)
for got, want in zip(times, want_times):
assert got == pytest.approx(want, 0.1), times
@pytest.mark.slow
def test_shrink(dsn, monkeypatch):
- p = pool.ConnectionPool(
- dsn, minconn=2, maxconn=4, num_workers=3, max_idle=0.2
- )
+
+ orig_run = pool.ShrinkPool._run
+ results = []
+
+ def run_hacked(self, pool):
+ n0 = pool._nconns
+ orig_run(self, pool)
+ n1 = pool._nconns
+ results.append((n0, n1))
+
+ monkeypatch.setattr(pool.ShrinkPool, "_run", run_hacked)
+
+ p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, max_idle=0.2)
assert p.max_idle == 0.2
def worker(n):
with p.connection() as conn:
- conn.execute("select 1 from pg_sleep(0.2)")
+ conn.execute("select pg_sleep(0.1)")
ts = []
for i in range(4):
for t in ts:
t.join()
- wait_pool_full(p)
- assert len(p._pool) == 4
-
- t0 = time()
- t = None
- while time() < t0 + 0.4:
- with p.connection():
- sleep(0.01)
- if p._nconns < 4:
- t = time() - t0
- break
-
- assert t == pytest.approx(0.2, 0.1)
+ sleep(1)
+ assert results == [(4, 4), (4, 3), (3, 2), (2, 2), (2, 2)]
@pytest.mark.slow
assert t1 - t0 < 0.2
+@pytest.mark.slow
+def test_uniform_use(dsn):
+ p = pool.ConnectionPool(dsn, minconn=4)
+ counts = Counter()
+ for i in range(8):
+ with p.connection() as conn:
+ sleep(0.1)
+ counts[id(conn)] += 1
+
+ assert len(counts) == 4
+ assert set(counts.values()) == set([2])
+
+
def delay_connection(monkeypatch, sec):
"""
Return a _connect_gen function delayed by the amount of seconds