From: Daniele Varrazzo Date: Mon, 22 Feb 2021 03:20:33 +0000 (+0100) Subject: Use the connections in the pool uniformly X-Git-Tag: 3.0.dev0~87^2~49 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ebbcab285a98e7eab56c08b6ef0c7cb5d3b943c7;p=thirdparty%2Fpsycopg.git Use the connections in the pool uniformly I feel this is a better use than using some more than other (e.g. in term of bloat of the connections associated with the resources) and gives a more predictable performance of the connection (there won't be some cold, some hot). Now there aren't really "unused connections" to single out in order to shrink the pool. So keep a tally of the number of connections unused and use a worker thread to close some if there are above minconn unused in a period. --- diff --git a/psycopg3/psycopg3/pool.py b/psycopg3/psycopg3/pool.py index e3879afb0..2e3b65578 100644 --- a/psycopg3/psycopg3/pool.py +++ b/psycopg3/psycopg3/pool.py @@ -10,7 +10,7 @@ import logging import threading from abc import ABC, abstractmethod from queue import Queue, Empty -from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Tuple +from typing import Any, Callable, Deque, Dict, Iterator, List, Optional from weakref import ref from contextlib import contextmanager from collections import deque @@ -82,11 +82,18 @@ class ConnectionPool: self.num_workers = num_workers self._nconns = minconn # currently in the pool, out, being prepared - self._pool: Deque[Tuple[Connection, float]] = deque() + self._pool: Deque[Connection] = deque() self._waiting: Deque["WaitingClient"] = deque() self._lock = threading.RLock() self._sched = Scheduler() + # Min number of connections in the pool in a max_idle unit of time. + # It is reset periodically by the ShrinkPool scheduled task. + # It is used to shrink back the pool if maxcon > minconn and extra + # connections have been acquired, if we notice that in the last + # max_idle interval they weren't all used. + self._nconns_min = minconn + self._wqueue: "Queue[MaintenanceTask]" = Queue() self._workers: List[threading.Thread] = [] for i in range(num_workers): @@ -123,6 +130,11 @@ class ConnectionPool: for i in range(self._nconns): self.run_task(AddConnection(self)) + # Schedule a task to shrink the pool if connections over minconn have + # remained unused. However if the pool cannot't grow don't bother. + if maxconn > minconn: + self.schedule_task(ShrinkPool(self), self.max_idle) + def __repr__(self) -> str: return ( f"<{self.__class__.__module__}.{self.__class__.__name__}" @@ -167,7 +179,7 @@ class ConnectionPool: failing to do so will deplete the pool. A depleted pool is a sad pool: you don't want a depleted pool. """ - logger.debug("connection requested to %r", self.name) + logger.info("connection requested to %r", self.name) # Critical section: decide here if there's a connection ready # or if the client needs to wait. with self._lock: @@ -177,7 +189,9 @@ class ConnectionPool: pos: Optional[WaitingClient] = None if self._pool: # Take a connection ready out of the pool - conn = self._pool.pop()[0] + conn = self._pool.popleft() + if len(self._pool) < self._nconns_min: + self._nconns_min = len(self._pool) else: # No connection available: put the client in the waiting queue pos = WaitingClient() @@ -185,8 +199,10 @@ class ConnectionPool: # If there is space for the pool to grow, let's do it if self._nconns < self.maxconn: - logger.debug("growing pool %r", self.name) self._nconns += 1 + logger.info( + "growing pool %r to %s", self.name, self._nconns + ) self.run_task(AddConnection(self)) # If we are in the waiting queue, wait to be assigned a connection @@ -200,7 +216,7 @@ class ConnectionPool: # Note that this property shouldn't be set while the connection is in # the pool, to avoid to create a reference loop. conn._pool = self - logger.debug("connection given by %r", self.name) + logger.info("connection given by %r", self.name) return conn def putconn(self, conn: Connection) -> None: @@ -220,7 +236,7 @@ class ConnectionPool: f"can't return connection to pool {self.name!r}, {msg}: {conn}" ) - logger.debug("returning connection to %r", self.name) + logger.info("returning connection to %r", self.name) # If the pool is closed just close the connection instead of returning # it to the pool. For extra refcare remove the pool reference from it. @@ -264,23 +280,8 @@ class ConnectionPool: break else: - now = time.monotonic() - # No client waiting for a connection: put it back into the pool - self._pool.append((conn, now)) - - # Also check if it's time to shrink the pool - if ( - self._nconns > self.minconn - and now - self._pool[0][1] > self.max_idle - ): - to_close, t0 = self._pool.popleft() - logger.debug( - "shrinking pool %r after connection unused for %s sec", - self.name, - now - t0, - ) - self._nconns -= 1 + self._pool.append(conn) if to_close: to_close.close() @@ -353,7 +354,7 @@ class ConnectionPool: pos.fail(PoolClosed(f"the pool {self.name!r} is closed")) # Close the connections still in the pool - for conn, _ in pool: + for conn in pool: conn.close() # Wait for the worker threads to terminate @@ -431,7 +432,7 @@ class ConnectionPool: class WaitingClient: - """An position in a queue for a client waiting for a connection.""" + """A position in a queue for a client waiting for a connection.""" __slots__ = ("conn", "error", "_cond") @@ -468,7 +469,7 @@ class WaitingClient: """Signal the client waiting that a connection is ready. Return True if the client has "accepted" the connection, False - otherwise (typically because wait() has timed out. + otherwise (typically because wait() has timed out). """ with self._cond: if self.conn or self.error: @@ -482,7 +483,7 @@ class WaitingClient: """Signal the client that, alas, they won't have a connection today. Return True if the client has "accepted" the error, False otherwise - (typically because wait() has timed out. + (typically because wait() has timed out). """ with self._cond: if self.conn or self.error: @@ -627,3 +628,39 @@ class ReturnConnection(MaintenanceTask): def _run(self, pool: ConnectionPool) -> None: pool._add_to_pool(self.conn) + + +class ShrinkPool(MaintenanceTask): + """If the pool can shrink, remove one connection. + + Re-schedule periodically and also reset the minimum number of connections + in the pool. + """ + + def _run(self, pool: ConnectionPool) -> None: + # Reschedule the task now so that in case of any error we don't lose + # the periodic run. + pool.schedule_task(self, pool.max_idle) + + to_close: Optional[Connection] = None + + with pool._lock: + # Reset the min number of connections used + nconns_min = pool._nconns_min + pool._nconns_min = len(pool._pool) + + # If the pool can shrink and connections were unused, drop one + if pool._nconns > pool.minconn and nconns_min > 0: + to_close = pool._pool.popleft() + pool._nconns -= 1 + + if to_close: + logger.info( + "shrinking pool %r to %s because %s unused connections" + " in the last %s sec", + pool.name, + pool._nconns, + nconns_min, + pool.max_idle, + ) + to_close.close() diff --git a/tests/test_pool.py b/tests/test_pool.py index f80f0a0c8..93d10580b 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -1,7 +1,8 @@ import logging import weakref -from time import monotonic, sleep, time +from time import sleep, time from threading import Thread +from collections import Counter import pytest @@ -64,10 +65,20 @@ def test_connection_not_lost(dsn): @pytest.mark.slow def test_concurrent_filling(dsn, monkeypatch): delay_connection(monkeypatch, 0.1) - t0 = monotonic() - p = pool.ConnectionPool(dsn, minconn=5, num_workers=2) - times = [item[1] - t0 for item in p._pool] + t0 = time() + times = [] + + add_to_pool_orig = pool.ConnectionPool._add_to_pool + + def _add_to_pool_time(self, conn): + times.append(time() - t0) + add_to_pool_orig(self, conn) + + monkeypatch.setattr(pool.ConnectionPool, "_add_to_pool", _add_to_pool_time) + + pool.ConnectionPool(dsn, minconn=5, num_workers=2) want_times = [0.1, 0.1, 0.2, 0.2, 0.3] + assert len(times) == len(want_times) for got, want in zip(times, want_times): assert got == pytest.approx(want, 0.1), times @@ -472,14 +483,24 @@ def test_grow(dsn, monkeypatch): @pytest.mark.slow def test_shrink(dsn, monkeypatch): - p = pool.ConnectionPool( - dsn, minconn=2, maxconn=4, num_workers=3, max_idle=0.2 - ) + + orig_run = pool.ShrinkPool._run + results = [] + + def run_hacked(self, pool): + n0 = pool._nconns + orig_run(self, pool) + n1 = pool._nconns + results.append((n0, n1)) + + monkeypatch.setattr(pool.ShrinkPool, "_run", run_hacked) + + p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, max_idle=0.2) assert p.max_idle == 0.2 def worker(n): with p.connection() as conn: - conn.execute("select 1 from pg_sleep(0.2)") + conn.execute("select pg_sleep(0.1)") ts = [] for i in range(4): @@ -490,19 +511,8 @@ def test_shrink(dsn, monkeypatch): for t in ts: t.join() - wait_pool_full(p) - assert len(p._pool) == 4 - - t0 = time() - t = None - while time() < t0 + 0.4: - with p.connection(): - sleep(0.01) - if p._nconns < 4: - t = time() - t0 - break - - assert t == pytest.approx(0.2, 0.1) + sleep(1) + assert results == [(4, 4), (4, 3), (3, 2), (2, 2), (2, 2)] @pytest.mark.slow @@ -579,6 +589,19 @@ def test_reconnect_failure(proxy): assert t1 - t0 < 0.2 +@pytest.mark.slow +def test_uniform_use(dsn): + p = pool.ConnectionPool(dsn, minconn=4) + counts = Counter() + for i in range(8): + with p.connection() as conn: + sleep(0.1) + counts[id(conn)] += 1 + + assert len(counts) == 4 + assert set(counts.values()) == set([2]) + + def delay_connection(monkeypatch, sec): """ Return a _connect_gen function delayed by the amount of seconds