]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Use the connections in the pool uniformly
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 22 Feb 2021 03:20:33 +0000 (04:20 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 12 Mar 2021 04:07:25 +0000 (05:07 +0100)
I feel this is a better use than using some more than other (e.g. in
term of bloat of the connections associated with the resources) and
gives a more predictable performance of the connection (there won't be
some cold, some hot).

Now there aren't really "unused connections" to single out in order to
shrink the pool. So keep a tally of the number of connections unused and
use a worker thread to close some if there are above minconn unused in a
period.

psycopg3/psycopg3/pool.py
tests/test_pool.py

index e3879afb0218d3694a15b06e8298d31519de5d64..2e3b65578c108a12d2513b2c532d836b83daa949 100644 (file)
@@ -10,7 +10,7 @@ import logging
 import threading
 from abc import ABC, abstractmethod
 from queue import Queue, Empty
-from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Tuple
+from typing import Any, Callable, Deque, Dict, Iterator, List, Optional
 from weakref import ref
 from contextlib import contextmanager
 from collections import deque
@@ -82,11 +82,18 @@ class ConnectionPool:
         self.num_workers = num_workers
 
         self._nconns = minconn  # currently in the pool, out, being prepared
-        self._pool: Deque[Tuple[Connection, float]] = deque()
+        self._pool: Deque[Connection] = deque()
         self._waiting: Deque["WaitingClient"] = deque()
         self._lock = threading.RLock()
         self._sched = Scheduler()
 
+        # Min number of connections in the pool in a max_idle unit of time.
+        # It is reset periodically by the ShrinkPool scheduled task.
+        # It is used to shrink back the pool if maxcon > minconn and extra
+        # connections have been acquired, if we notice that in the last
+        # max_idle interval they weren't all used.
+        self._nconns_min = minconn
+
         self._wqueue: "Queue[MaintenanceTask]" = Queue()
         self._workers: List[threading.Thread] = []
         for i in range(num_workers):
@@ -123,6 +130,11 @@ class ConnectionPool:
             for i in range(self._nconns):
                 self.run_task(AddConnection(self))
 
+        # Schedule a task to shrink the pool if connections over minconn have
+        # remained unused. However if the pool cannot't grow don't bother.
+        if maxconn > minconn:
+            self.schedule_task(ShrinkPool(self), self.max_idle)
+
     def __repr__(self) -> str:
         return (
             f"<{self.__class__.__module__}.{self.__class__.__name__}"
@@ -167,7 +179,7 @@ class ConnectionPool:
         failing to do so will deplete the pool. A depleted pool is a sad pool:
         you don't want a depleted pool.
         """
-        logger.debug("connection requested to %r", self.name)
+        logger.info("connection requested to %r", self.name)
         # Critical section: decide here if there's a connection ready
         # or if the client needs to wait.
         with self._lock:
@@ -177,7 +189,9 @@ class ConnectionPool:
             pos: Optional[WaitingClient] = None
             if self._pool:
                 # Take a connection ready out of the pool
-                conn = self._pool.pop()[0]
+                conn = self._pool.popleft()
+                if len(self._pool) < self._nconns_min:
+                    self._nconns_min = len(self._pool)
             else:
                 # No connection available: put the client in the waiting queue
                 pos = WaitingClient()
@@ -185,8 +199,10 @@ class ConnectionPool:
 
                 # If there is space for the pool to grow, let's do it
                 if self._nconns < self.maxconn:
-                    logger.debug("growing pool %r", self.name)
                     self._nconns += 1
+                    logger.info(
+                        "growing pool %r to %s", self.name, self._nconns
+                    )
                     self.run_task(AddConnection(self))
 
         # If we are in the waiting queue, wait to be assigned a connection
@@ -200,7 +216,7 @@ class ConnectionPool:
         # Note that this property shouldn't be set while the connection is in
         # the pool, to avoid to create a reference loop.
         conn._pool = self
-        logger.debug("connection given by %r", self.name)
+        logger.info("connection given by %r", self.name)
         return conn
 
     def putconn(self, conn: Connection) -> None:
@@ -220,7 +236,7 @@ class ConnectionPool:
                 f"can't return connection to pool {self.name!r}, {msg}: {conn}"
             )
 
-        logger.debug("returning connection to %r", self.name)
+        logger.info("returning connection to %r", self.name)
 
         # If the pool is closed just close the connection instead of returning
         # it to the pool. For extra refcare remove the pool reference from it.
@@ -264,23 +280,8 @@ class ConnectionPool:
                     break
 
             else:
-                now = time.monotonic()
-
                 # No client waiting for a connection: put it back into the pool
-                self._pool.append((conn, now))
-
-                # Also check if it's time to shrink the pool
-                if (
-                    self._nconns > self.minconn
-                    and now - self._pool[0][1] > self.max_idle
-                ):
-                    to_close, t0 = self._pool.popleft()
-                    logger.debug(
-                        "shrinking pool %r after connection unused for %s sec",
-                        self.name,
-                        now - t0,
-                    )
-                    self._nconns -= 1
+                self._pool.append(conn)
 
         if to_close:
             to_close.close()
@@ -353,7 +354,7 @@ class ConnectionPool:
             pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
 
         # Close the connections still in the pool
-        for conn, _ in pool:
+        for conn in pool:
             conn.close()
 
         # Wait for the worker threads to terminate
@@ -431,7 +432,7 @@ class ConnectionPool:
 
 
 class WaitingClient:
-    """An position in a queue for a client waiting for a connection."""
+    """A position in a queue for a client waiting for a connection."""
 
     __slots__ = ("conn", "error", "_cond")
 
@@ -468,7 +469,7 @@ class WaitingClient:
         """Signal the client waiting that a connection is ready.
 
         Return True if the client has "accepted" the connection, False
-        otherwise (typically because wait() has timed out.
+        otherwise (typically because wait() has timed out).
         """
         with self._cond:
             if self.conn or self.error:
@@ -482,7 +483,7 @@ class WaitingClient:
         """Signal the client that, alas, they won't have a connection today.
 
         Return True if the client has "accepted" the error, False otherwise
-        (typically because wait() has timed out.
+        (typically because wait() has timed out).
         """
         with self._cond:
             if self.conn or self.error:
@@ -627,3 +628,39 @@ class ReturnConnection(MaintenanceTask):
 
     def _run(self, pool: ConnectionPool) -> None:
         pool._add_to_pool(self.conn)
+
+
+class ShrinkPool(MaintenanceTask):
+    """If the pool can shrink, remove one connection.
+
+    Re-schedule periodically and also reset the minimum number of connections
+    in the pool.
+    """
+
+    def _run(self, pool: ConnectionPool) -> None:
+        # Reschedule the task now so that in case of any error we don't lose
+        # the periodic run.
+        pool.schedule_task(self, pool.max_idle)
+
+        to_close: Optional[Connection] = None
+
+        with pool._lock:
+            # Reset the min number of connections used
+            nconns_min = pool._nconns_min
+            pool._nconns_min = len(pool._pool)
+
+            # If the pool can shrink and connections were unused, drop one
+            if pool._nconns > pool.minconn and nconns_min > 0:
+                to_close = pool._pool.popleft()
+                pool._nconns -= 1
+
+        if to_close:
+            logger.info(
+                "shrinking pool %r to %s because %s unused connections"
+                " in the last %s sec",
+                pool.name,
+                pool._nconns,
+                nconns_min,
+                pool.max_idle,
+            )
+            to_close.close()
index f80f0a0c82a99b7f35eb7e674d593694bedeea1b..93d10580ba1a01066fbf902edcc9a701f1e4d72f 100644 (file)
@@ -1,7 +1,8 @@
 import logging
 import weakref
-from time import monotonic, sleep, time
+from time import sleep, time
 from threading import Thread
+from collections import Counter
 
 import pytest
 
@@ -64,10 +65,20 @@ def test_connection_not_lost(dsn):
 @pytest.mark.slow
 def test_concurrent_filling(dsn, monkeypatch):
     delay_connection(monkeypatch, 0.1)
-    t0 = monotonic()
-    p = pool.ConnectionPool(dsn, minconn=5, num_workers=2)
-    times = [item[1] - t0 for item in p._pool]
+    t0 = time()
+    times = []
+
+    add_to_pool_orig = pool.ConnectionPool._add_to_pool
+
+    def _add_to_pool_time(self, conn):
+        times.append(time() - t0)
+        add_to_pool_orig(self, conn)
+
+    monkeypatch.setattr(pool.ConnectionPool, "_add_to_pool", _add_to_pool_time)
+
+    pool.ConnectionPool(dsn, minconn=5, num_workers=2)
     want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
+    assert len(times) == len(want_times)
     for got, want in zip(times, want_times):
         assert got == pytest.approx(want, 0.1), times
 
@@ -472,14 +483,24 @@ def test_grow(dsn, monkeypatch):
 
 @pytest.mark.slow
 def test_shrink(dsn, monkeypatch):
-    p = pool.ConnectionPool(
-        dsn, minconn=2, maxconn=4, num_workers=3, max_idle=0.2
-    )
+
+    orig_run = pool.ShrinkPool._run
+    results = []
+
+    def run_hacked(self, pool):
+        n0 = pool._nconns
+        orig_run(self, pool)
+        n1 = pool._nconns
+        results.append((n0, n1))
+
+    monkeypatch.setattr(pool.ShrinkPool, "_run", run_hacked)
+
+    p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, max_idle=0.2)
     assert p.max_idle == 0.2
 
     def worker(n):
         with p.connection() as conn:
-            conn.execute("select 1 from pg_sleep(0.2)")
+            conn.execute("select pg_sleep(0.1)")
 
     ts = []
     for i in range(4):
@@ -490,19 +511,8 @@ def test_shrink(dsn, monkeypatch):
     for t in ts:
         t.join()
 
-    wait_pool_full(p)
-    assert len(p._pool) == 4
-
-    t0 = time()
-    t = None
-    while time() < t0 + 0.4:
-        with p.connection():
-            sleep(0.01)
-            if p._nconns < 4:
-                t = time() - t0
-                break
-
-    assert t == pytest.approx(0.2, 0.1)
+    sleep(1)
+    assert results == [(4, 4), (4, 3), (3, 2), (2, 2), (2, 2)]
 
 
 @pytest.mark.slow
@@ -579,6 +589,19 @@ def test_reconnect_failure(proxy):
     assert t1 - t0 < 0.2
 
 
+@pytest.mark.slow
+def test_uniform_use(dsn):
+    p = pool.ConnectionPool(dsn, minconn=4)
+    counts = Counter()
+    for i in range(8):
+        with p.connection() as conn:
+            sleep(0.1)
+            counts[id(conn)] += 1
+
+    assert len(counts) == 4
+    assert set(counts.values()) == set([2])
+
+
 def delay_connection(monkeypatch, sec):
     """
     Return a _connect_gen function delayed by the amount of seconds