From: Daniele Varrazzo Date: Sat, 20 Feb 2021 03:16:27 +0000 (+0100) Subject: Shrink the pool when connections have been idle long enough X-Git-Tag: 3.0.dev0~87^2~65 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=300f13af263e93326c20b0b063b55cc718c7401a;p=thirdparty%2Fpsycopg.git Shrink the pool when connections have been idle long enough Pool behaviour on start changed: block on __init__ until minconn connections have been obtained or raise PoolTimeout if timeout_sec have passed. Not doing so makes complicated to understand, when a connection is requested, if it's done during initialisation, and avoid an unneeded grow request. --- diff --git a/psycopg3/psycopg3/pool.py b/psycopg3/psycopg3/pool.py index e44380167..b9de95e41 100644 --- a/psycopg3/psycopg3/pool.py +++ b/psycopg3/psycopg3/pool.py @@ -44,6 +44,7 @@ class ConnectionPool: maxconn: Optional[int] = None, name: Optional[str] = None, timeout_sec: float = 30.0, + max_idle_sec: float = 10 * 60.0, num_workers: int = 1, ): if maxconn is None: @@ -69,10 +70,11 @@ class ConnectionPool: self.minconn = minconn self.maxconn = maxconn self.timeout_sec = timeout_sec + self.max_idle_sec = max_idle_sec self.num_workers = num_workers - self._nconns = 0 # currently in the pool, out, being prepared - self._pool: List[Tuple[Connection, float]] = [] + self._nconns = minconn # currently in the pool, out, being prepared + self._pool: Deque[Tuple[Connection, float]] = deque() self._waiting: Deque["WaitingClient"] = deque() self._lock = threading.Lock() self._closed = False @@ -85,8 +87,17 @@ class ConnectionPool: t.start() self._workers.append(t) - # Run a task to create the connections immediately - self.add_task(TopUpConnections(self)) + # Populate the pool with initial minconn connections + event = threading.Event() + for i in range(self._nconns): + self.add_task(AddInitialConnection(self, event)) + + # Wait for the pool to be full or throw an error + if not event.wait(timeout=timeout_sec): + self.close() # stop all the threads + raise PoolTimeout( + f"pool initialization incomplete after {timeout_sec} sec" + ) def __repr__(self) -> str: return ( @@ -136,7 +147,7 @@ class ConnectionPool: pos: Optional[WaitingClient] = None if self._pool: # Take a connection ready out of the pool - conn = self._pool.pop(-1)[0] + conn = self._pool.pop()[0] else: # No connection available: put the client in the waiting queue pos = WaitingClient() @@ -209,21 +220,41 @@ class ConnectionPool: self.add_task(AddConnection(self)) return + pos: Optional[WaitingClient] = None + to_close: Optional[Connection] = None + # Critical section: if there is a client waiting give it the connection # otherwise put it back into the pool. with self._lock: - pos: Optional[WaitingClient] = None if self._waiting: # Extract the first client from the queue pos = self._waiting.popleft() else: + now = time.time() + # No client waiting for a connection: put it back into the pool - self._pool.append((conn, time.time())) + self._pool.append((conn, now)) + + # Also check if it's time to shrink the pool + if ( + self._nconns > self.minconn + and now - self._pool[0][1] > self.max_idle_sec + ): + to_close, t0 = self._pool.popleft() + logger.debug( + "shrinking pool %r after connection unused for %s sec", + self.name, + now - t0, + ) + self._nconns -= 1 # If we found a client in queue, give it the connection and notify it if pos: pos.set(conn) + if to_close: + to_close.close() + def _reset_transaction_status(self, conn: Connection) -> None: """ Bring a connection to IDLE state or close it. @@ -276,7 +307,7 @@ class ConnectionPool: # Close the connections still in the pool while self._pool: - conn = self._pool.pop(-1)[0] + conn = self._pool.pop()[0] conn.close() # Stop the worker threads @@ -388,33 +419,28 @@ class StopWorker(MaintenanceTask): pass -class TopUpConnections(MaintenanceTask): - """Increase the number of connections in the pool to the desired number.""" +class AddConnection(MaintenanceTask): + """Add a new connection into to the pool.""" def _run(self) -> None: - with self.pool._lock: - # Check if there are new connections to create. If there are - # update the number of connections managed immediately and in - # the same critical section to avoid finding more than owed - nconns = self.pool._nconns - if nconns < self.pool.minconn: - newconns = self.pool.minconn - nconns - self.pool._nconns += newconns - else: - return + conn = self.pool._connect() + self.pool._add_to_pool(conn) - # enqueue connection creations command so that might be picked in - # parallel if possible - for i in range(newconns): - self.pool.add_task(AddConnection(self.pool)) +class AddInitialConnection(AddConnection): + """Add a new connection into to the pool. -class AddConnection(MaintenanceTask): - """Add a new connection into to the pool.""" + If the desired number of connections is reached notify the event. + """ + + def __init__(self, pool: ConnectionPool, event: threading.Event): + super().__init__(pool) + self.event = event def _run(self) -> None: - conn = self.pool._connect() - self.pool._add_to_pool(conn) + super()._run() + if len(self.pool._pool) >= self.pool._nconns: + self.event.set() class ReturnConnection(MaintenanceTask): diff --git a/tests/test_pool.py b/tests/test_pool.py index 8a1bc106e..8e48ad74e 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -55,13 +55,24 @@ def test_concurrent_filling(dsn, monkeypatch): delay_connection(monkeypatch, 0.1) t0 = time() p = pool.ConnectionPool(dsn, minconn=5, num_workers=2) - wait_pool_full(p) times = [item[1] - t0 for item in p._pool] want_times = [0.1, 0.1, 0.2, 0.2, 0.3] for got, want in zip(times, want_times): assert got == pytest.approx(want, 0.1), times +@pytest.mark.slow +def test_init_timeout(dsn, monkeypatch): + delay_connection(monkeypatch, 0.1) + with pytest.raises(pool.PoolTimeout): + pool.ConnectionPool(dsn, minconn=4, num_workers=1, timeout_sec=0.3) + + p = pool.ConnectionPool(dsn, minconn=4, num_workers=1, timeout_sec=0.5) + p.close() + p = pool.ConnectionPool(dsn, minconn=4, num_workers=2, timeout_sec=0.3) + p.close() + + @pytest.mark.slow def test_queue(dsn): p = pool.ConnectionPool(dsn, minconn=2) @@ -354,7 +365,6 @@ def test_closed_queue(dsn): @pytest.mark.slow def test_grow(dsn, monkeypatch): p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, num_workers=3) - wait_pool_full(p) delay_connection(monkeypatch, 0.1) ts = [] results = [] @@ -380,6 +390,46 @@ def test_grow(dsn, monkeypatch): assert got == pytest.approx(want, 0.15), times +def test_default_max_idle_sec(dsn): + p = pool.ConnectionPool(dsn) + assert p.max_idle_sec == 600 + + +@pytest.mark.slow +def test_shrink(dsn, monkeypatch): + p = pool.ConnectionPool( + dsn, minconn=2, maxconn=4, num_workers=3, max_idle_sec=0.2 + ) + assert p.max_idle_sec == 0.2 + + def worker(n): + with p.connection() as conn: + conn.execute("select 1 from pg_sleep(0.2)") + + ts = [] + for i in range(4): + t = Thread(target=worker, args=(i,)) + t.start() + ts.append(t) + + for t in ts: + t.join() + + wait_pool_full(p) + assert len(p._pool) == 4 + + t0 = time() + t = None + while time() < t0 + 0.4: + with p.connection(): + sleep(0.01) + if p._nconns < 4: + t = time() - t0 + break + + assert t == pytest.approx(0.2, 0.1) + + def delay_connection(monkeypatch, sec): """ Return a _connect_gen function delayed by the amount of seconds @@ -397,5 +447,5 @@ def delay_connection(monkeypatch, sec): def wait_pool_full(pool): - while len(pool._pool) < pool.minconn: + while len(pool._pool) < pool._nconns: sleep(0.01)