]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Shrink the pool when connections have been idle long enough
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 20 Feb 2021 03:16:27 +0000 (04:16 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 12 Mar 2021 04:07:25 +0000 (05:07 +0100)
Pool behaviour on start changed: block on __init__ until minconn
connections have been obtained or raise PoolTimeout if timeout_sec have
passed. Not doing so makes complicated to understand, when a connection
is requested, if it's done during initialisation, and avoid an unneeded
grow request.

psycopg3/psycopg3/pool.py
tests/test_pool.py

index e443801671331ce5427ced7d82e30c1376339b99..b9de95e41d23609cb50554b21988fb125681ba4f 100644 (file)
@@ -44,6 +44,7 @@ class ConnectionPool:
         maxconn: Optional[int] = None,
         name: Optional[str] = None,
         timeout_sec: float = 30.0,
+        max_idle_sec: float = 10 * 60.0,
         num_workers: int = 1,
     ):
         if maxconn is None:
@@ -69,10 +70,11 @@ class ConnectionPool:
         self.minconn = minconn
         self.maxconn = maxconn
         self.timeout_sec = timeout_sec
+        self.max_idle_sec = max_idle_sec
         self.num_workers = num_workers
 
-        self._nconns = 0  # currently in the pool, out, being prepared
-        self._pool: List[Tuple[Connection, float]] = []
+        self._nconns = minconn  # currently in the pool, out, being prepared
+        self._pool: Deque[Tuple[Connection, float]] = deque()
         self._waiting: Deque["WaitingClient"] = deque()
         self._lock = threading.Lock()
         self._closed = False
@@ -85,8 +87,17 @@ class ConnectionPool:
             t.start()
             self._workers.append(t)
 
-        # Run a task to create the connections immediately
-        self.add_task(TopUpConnections(self))
+        # Populate the pool with initial minconn connections
+        event = threading.Event()
+        for i in range(self._nconns):
+            self.add_task(AddInitialConnection(self, event))
+
+        # Wait for the pool to be full or throw an error
+        if not event.wait(timeout=timeout_sec):
+            self.close()  # stop all the threads
+            raise PoolTimeout(
+                f"pool initialization incomplete after {timeout_sec} sec"
+            )
 
     def __repr__(self) -> str:
         return (
@@ -136,7 +147,7 @@ class ConnectionPool:
             pos: Optional[WaitingClient] = None
             if self._pool:
                 # Take a connection ready out of the pool
-                conn = self._pool.pop(-1)[0]
+                conn = self._pool.pop()[0]
             else:
                 # No connection available: put the client in the waiting queue
                 pos = WaitingClient()
@@ -209,21 +220,41 @@ class ConnectionPool:
             self.add_task(AddConnection(self))
             return
 
+        pos: Optional[WaitingClient] = None
+        to_close: Optional[Connection] = None
+
         # Critical section: if there is a client waiting give it the connection
         # otherwise put it back into the pool.
         with self._lock:
-            pos: Optional[WaitingClient] = None
             if self._waiting:
                 # Extract the first client from the queue
                 pos = self._waiting.popleft()
             else:
+                now = time.time()
+
                 # No client waiting for a connection: put it back into the pool
-                self._pool.append((conn, time.time()))
+                self._pool.append((conn, now))
+
+                # Also check if it's time to shrink the pool
+                if (
+                    self._nconns > self.minconn
+                    and now - self._pool[0][1] > self.max_idle_sec
+                ):
+                    to_close, t0 = self._pool.popleft()
+                    logger.debug(
+                        "shrinking pool %r after connection unused for %s sec",
+                        self.name,
+                        now - t0,
+                    )
+                    self._nconns -= 1
 
         # If we found a client in queue, give it the connection and notify it
         if pos:
             pos.set(conn)
 
+        if to_close:
+            to_close.close()
+
     def _reset_transaction_status(self, conn: Connection) -> None:
         """
         Bring a connection to IDLE state or close it.
@@ -276,7 +307,7 @@ class ConnectionPool:
 
         # Close the connections still in the pool
         while self._pool:
-            conn = self._pool.pop(-1)[0]
+            conn = self._pool.pop()[0]
             conn.close()
 
         # Stop the worker threads
@@ -388,33 +419,28 @@ class StopWorker(MaintenanceTask):
         pass
 
 
-class TopUpConnections(MaintenanceTask):
-    """Increase the number of connections in the pool to the desired number."""
+class AddConnection(MaintenanceTask):
+    """Add a new connection into to the pool."""
 
     def _run(self) -> None:
-        with self.pool._lock:
-            # Check if there are new connections to create. If there are
-            # update the number of connections managed immediately and in
-            # the same critical section to avoid finding more than owed
-            nconns = self.pool._nconns
-            if nconns < self.pool.minconn:
-                newconns = self.pool.minconn - nconns
-                self.pool._nconns += newconns
-            else:
-                return
+        conn = self.pool._connect()
+        self.pool._add_to_pool(conn)
 
-        # enqueue connection creations command so that might be picked in
-        # parallel if possible
-        for i in range(newconns):
-            self.pool.add_task(AddConnection(self.pool))
 
+class AddInitialConnection(AddConnection):
+    """Add a new connection into to the pool.
 
-class AddConnection(MaintenanceTask):
-    """Add a new connection into to the pool."""
+    If the desired number of connections is reached notify the event.
+    """
+
+    def __init__(self, pool: ConnectionPool, event: threading.Event):
+        super().__init__(pool)
+        self.event = event
 
     def _run(self) -> None:
-        conn = self.pool._connect()
-        self.pool._add_to_pool(conn)
+        super()._run()
+        if len(self.pool._pool) >= self.pool._nconns:
+            self.event.set()
 
 
 class ReturnConnection(MaintenanceTask):
index 8a1bc106e3656c35aa4da0a813fc75d12ba09738..8e48ad74ebcd48903bf7dccee4c96442030b6c88 100644 (file)
@@ -55,13 +55,24 @@ def test_concurrent_filling(dsn, monkeypatch):
     delay_connection(monkeypatch, 0.1)
     t0 = time()
     p = pool.ConnectionPool(dsn, minconn=5, num_workers=2)
-    wait_pool_full(p)
     times = [item[1] - t0 for item in p._pool]
     want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
     for got, want in zip(times, want_times):
         assert got == pytest.approx(want, 0.1), times
 
 
+@pytest.mark.slow
+def test_init_timeout(dsn, monkeypatch):
+    delay_connection(monkeypatch, 0.1)
+    with pytest.raises(pool.PoolTimeout):
+        pool.ConnectionPool(dsn, minconn=4, num_workers=1, timeout_sec=0.3)
+
+    p = pool.ConnectionPool(dsn, minconn=4, num_workers=1, timeout_sec=0.5)
+    p.close()
+    p = pool.ConnectionPool(dsn, minconn=4, num_workers=2, timeout_sec=0.3)
+    p.close()
+
+
 @pytest.mark.slow
 def test_queue(dsn):
     p = pool.ConnectionPool(dsn, minconn=2)
@@ -354,7 +365,6 @@ def test_closed_queue(dsn):
 @pytest.mark.slow
 def test_grow(dsn, monkeypatch):
     p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, num_workers=3)
-    wait_pool_full(p)
     delay_connection(monkeypatch, 0.1)
     ts = []
     results = []
@@ -380,6 +390,46 @@ def test_grow(dsn, monkeypatch):
         assert got == pytest.approx(want, 0.15), times
 
 
+def test_default_max_idle_sec(dsn):
+    p = pool.ConnectionPool(dsn)
+    assert p.max_idle_sec == 600
+
+
+@pytest.mark.slow
+def test_shrink(dsn, monkeypatch):
+    p = pool.ConnectionPool(
+        dsn, minconn=2, maxconn=4, num_workers=3, max_idle_sec=0.2
+    )
+    assert p.max_idle_sec == 0.2
+
+    def worker(n):
+        with p.connection() as conn:
+            conn.execute("select 1 from pg_sleep(0.2)")
+
+    ts = []
+    for i in range(4):
+        t = Thread(target=worker, args=(i,))
+        t.start()
+        ts.append(t)
+
+    for t in ts:
+        t.join()
+
+    wait_pool_full(p)
+    assert len(p._pool) == 4
+
+    t0 = time()
+    t = None
+    while time() < t0 + 0.4:
+        with p.connection():
+            sleep(0.01)
+            if p._nconns < 4:
+                t = time() - t0
+                break
+
+    assert t == pytest.approx(0.2, 0.1)
+
+
 def delay_connection(monkeypatch, sec):
     """
     Return a _connect_gen function delayed by the amount of seconds
@@ -397,5 +447,5 @@ def delay_connection(monkeypatch, sec):
 
 
 def wait_pool_full(pool):
-    while len(pool._pool) < pool.minconn:
+    while len(pool._pool) < pool._nconns:
         sleep(0.01)