]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Add pool.wait_ready, drop connection_timeout on init
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Thu, 25 Feb 2021 20:57:20 +0000 (21:57 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 12 Mar 2021 04:07:25 +0000 (05:07 +0100)
This way init should never block. Always fill the connection in
background but a client can decide if they want to wait for it to be
ready.

psycopg3/psycopg3/pool/pool.py
psycopg3/psycopg3/pool/tasks.py
tests/pool/test_pool.py

index b6718f9f26e2955d9ff4514fcf8b19229bc68449..ce6282ed36929faca657b66bfaf1a6066b511fe4 100644 (file)
@@ -39,7 +39,6 @@ class ConnectionPool:
         maxconn: Optional[int] = None,
         name: Optional[str] = None,
         timeout: float = 30.0,
-        setup_timeout: float = 30.0,
         max_idle: float = 10 * 60.0,
         reconnect_timeout: float = 5 * 60.0,
         reconnect_failed: Optional[Callable[["ConnectionPool"], None]] = None,
@@ -87,6 +86,9 @@ class ConnectionPool:
         # max_idle interval they weren't all used.
         self._nconns_min = minconn
 
+        # to notify that the pool is full
+        self._pool_full_event: Optional[threading.Event] = None
+
         self._tasks: "Queue[tasks.MaintenanceTask]" = Queue()
         self._workers: List[threading.Thread] = []
         for i in range(num_workers):
@@ -108,22 +110,9 @@ class ConnectionPool:
         for t in self._workers:
             t.start()
 
-        # Populate the pool with initial minconn connections
-        # Block if setup_timeout is > 0, otherwise fill the pool in background
-        if setup_timeout > 0:
-            event = threading.Event()
-            for i in range(self._nconns):
-                self.run_task(tasks.AddInitialConnection(self, event))
-
-            # Wait for the pool to be full or throw an error
-            if not event.wait(timeout=setup_timeout):
-                self.close()  # stop all the threads
-                raise PoolTimeout(
-                    f"pool initialization incomplete after {setup_timeout} sec"
-                )
-        else:
-            for i in range(self._nconns):
-                self.run_task(tasks.AddConnection(self))
+        # Populate the pool with initial minconn connections in background
+        for i in range(self._nconns):
+            self.run_task(tasks.AddConnection(self))
 
         # Schedule a task to shrink the pool if connections over minconn have
         # remained unused. However if the pool cannot't grow don't bother.
@@ -142,6 +131,27 @@ class ConnectionPool:
         if hasattr(self, "_closed"):
             self.close(timeout=0)
 
+    def wait_ready(self, timeout: float = 30.0) -> None:
+        """
+        Wait for the pool to be full after init.
+
+        Raise `PoolTimeout` if not ready within *timeout* sec.
+        """
+        with self._lock:
+            assert not self._pool_full_event
+            if len(self._pool) >= self._nconns:
+                return
+            self._pool_full_event = threading.Event()
+
+        if not self._pool_full_event.wait(timeout):
+            self.close()  # stop all the threads
+            raise PoolTimeout(
+                f"pool initialization incomplete after {timeout} sec"
+            )
+
+        with self._lock:
+            self._pool_full_event = None
+
     @contextmanager
     def connection(
         self, timeout: Optional[float] = None
@@ -355,24 +365,6 @@ class ConnectionPool:
         conn._pool = self
         return conn
 
-    def _add_initial_connection(self, event: threading.Event) -> None:
-        """Create a new connection at the beginning of the pool life.
-
-        Trigger *event* if all the connections necessary have been added.
-        """
-        conn = self._connect()
-        conn._pool = None  # avoid a reference loop
-
-        with self._lock:
-            assert (
-                not self._waiting
-            ), "clients waiting in a pool being initialised"
-            self._pool.append(conn)
-            trigger_event = len(self._pool) >= self._nconns
-
-        if trigger_event:
-            event.set()
-
     def _add_connection(self, attempt: Optional[ConnectionAttempt]) -> None:
         """Try to connect and add the connection to the pool.
 
@@ -450,6 +442,11 @@ class ConnectionPool:
                 # No client waiting for a connection: put it back into the pool
                 self._pool.append(conn)
 
+                # If we have been asked to wait for pool init, notify the
+                # waiter if the pool is full.
+                if self._pool_full_event and len(self._pool) >= self._nconns:
+                    self._pool_full_event.set()
+
     def _reset_connection(self, conn: Connection) -> None:
         """
         Bring a connection to IDLE state or close it.
index fe1a11ba4daaed7e20ebc54db8445796d3473ea8..5b0690183dea047b20701231c6ef0c8cf140049a 100644 (file)
@@ -5,7 +5,6 @@ Maintenance tasks for the connection pools.
 # Copyright (C) 2021 The Psycopg Team
 
 import logging
-import threading
 from abc import ABC, abstractmethod
 from typing import Optional, TYPE_CHECKING
 from weakref import ref
@@ -69,20 +68,6 @@ class StopWorker(MaintenanceTask):
         pass
 
 
-class AddInitialConnection(MaintenanceTask):
-    """Add a new connection into to the pool.
-
-    If the desired number of connections is reached notify the event.
-    """
-
-    def __init__(self, pool: "ConnectionPool", event: threading.Event):
-        super().__init__(pool)
-        self.event = event
-
-    def _run(self, pool: "ConnectionPool") -> None:
-        pool._add_initial_connection(self.event)
-
-
 class AddConnection(MaintenanceTask):
     def __init__(
         self,
index 93c4cb37cdd64930dbc9c373774be6be71086882..87309b8eebc065d5a922ae63322b3f9f596b3f41 100644 (file)
@@ -68,17 +68,16 @@ def test_concurrent_filling(dsn, monkeypatch):
     t0 = time()
     times = []
 
-    add_orig = pool.ConnectionPool._add_initial_connection
+    add_orig = pool.ConnectionPool._add_to_pool
 
-    def add_time(self, event):
-        add_orig(self, event)
+    def add_time(self, conn):
         times.append(time() - t0)
+        add_orig(self, conn)
 
-    monkeypatch.setattr(
-        pool.ConnectionPool, "_add_initial_connection", add_time
-    )
+    monkeypatch.setattr(pool.ConnectionPool, "_add_to_pool", add_time)
 
-    pool.ConnectionPool(dsn, minconn=5, num_workers=2)
+    p = pool.ConnectionPool(dsn, minconn=5, num_workers=2)
+    p.wait_ready(5.0)
     want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
     assert len(times) == len(want_times)
     for got, want in zip(times, want_times):
@@ -86,27 +85,28 @@ def test_concurrent_filling(dsn, monkeypatch):
 
 
 @pytest.mark.slow
-def test_setup_timeout(dsn, monkeypatch):
+def test_wait_ready(dsn, monkeypatch):
     delay_connection(monkeypatch, 0.1)
     with pytest.raises(pool.PoolTimeout):
-        pool.ConnectionPool(dsn, minconn=4, num_workers=1, setup_timeout=0.3)
+        p = pool.ConnectionPool(dsn, minconn=4, num_workers=1)
+        p.wait_ready(0.3)
 
-    p = pool.ConnectionPool(dsn, minconn=4, num_workers=1, setup_timeout=0.5)
+    p = pool.ConnectionPool(dsn, minconn=4, num_workers=1)
+    p.wait_ready(0.5)
     p.close()
-    p = pool.ConnectionPool(dsn, minconn=4, num_workers=2, setup_timeout=0.3)
+    p = pool.ConnectionPool(dsn, minconn=4, num_workers=2)
+    p.wait_ready(0.3)
+    p.wait_ready(0.0001)  # idempotent
     p.close()
 
 
 @pytest.mark.slow
 def test_setup_no_timeout(dsn, proxy):
     with pytest.raises(pool.PoolTimeout):
-        pool.ConnectionPool(
-            proxy.client_dsn, minconn=1, num_workers=1, setup_timeout=0.2
-        )
+        p = pool.ConnectionPool(proxy.client_dsn, minconn=1, num_workers=1)
+        p.wait_ready(0.2)
 
-    p = pool.ConnectionPool(
-        proxy.client_dsn, minconn=1, num_workers=1, setup_timeout=0
-    )
+    p = pool.ConnectionPool(proxy.client_dsn, minconn=1, num_workers=1)
     sleep(0.5)
     assert not p._pool
     proxy.start()
@@ -382,8 +382,7 @@ def test_del_no_warning(dsn, recwarn):
     with p.connection() as conn:
         conn.execute("select 1")
 
-    wait_pool_full(p)
-
+    p.wait_ready()
     ref = weakref.ref(p)
     del p
     assert not ref()
@@ -458,6 +457,7 @@ def test_closed_queue(dsn):
 @pytest.mark.slow
 def test_grow(dsn, monkeypatch):
     p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, num_workers=3)
+    p.wait_ready(5.0)
     delay_connection(monkeypatch, 0.1)
     ts = []
     results = []
@@ -500,6 +500,7 @@ def test_shrink(dsn, monkeypatch):
     monkeypatch.setattr(ShrinkPool, "_run", run_hacked)
 
     p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, max_idle=0.2)
+    p.wait_ready(5.0)
     assert p.max_idle == 0.2
 
     def worker(n):
@@ -529,7 +530,8 @@ def test_reconnect(proxy, caplog, monkeypatch):
     monkeypatch.setattr(pool.pool.ConnectionAttempt, "DELAY_JITTER", 0.0)
 
     proxy.start()
-    p = pool.ConnectionPool(proxy.client_dsn, minconn=1, setup_timeout=2.0)
+    p = pool.ConnectionPool(proxy.client_dsn, minconn=1)
+    p.wait_ready(2.0)
     proxy.stop()
 
     with pytest.raises(psycopg3.OperationalError):
@@ -538,7 +540,7 @@ def test_reconnect(proxy, caplog, monkeypatch):
 
     sleep(1.0)
     proxy.start()
-    wait_pool_full(p)
+    p.wait_ready()
 
     with p.connection() as conn:
         conn.execute("select 1")
@@ -569,10 +571,10 @@ def test_reconnect_failure(proxy):
         proxy.client_dsn,
         name="this-one",
         minconn=1,
-        setup_timeout=2.0,
         reconnect_timeout=1.0,
         reconnect_failed=failed,
     )
+    p.wait_ready(2.0)
     proxy.stop()
 
     with pytest.raises(psycopg3.OperationalError):
@@ -620,8 +622,3 @@ def delay_connection(monkeypatch, sec):
         return rv
 
     monkeypatch.setattr(psycopg3.Connection, "connect", connect_delay)
-
-
-def wait_pool_full(pool):
-    while len(pool._pool) < pool._nconns:
-        sleep(0.01)