]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Wait worker thread to stop on pool close
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 21 Feb 2021 12:41:02 +0000 (13:41 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 12 Mar 2021 04:07:25 +0000 (05:07 +0100)
psycopg3/psycopg3/pool.py
tests/test_pool.py

index 9a83993b6094f98cd1daf2f32bfe06827b9a822e..df333e650c4f6590d2476250b494f101e8e77fa3 100644 (file)
@@ -133,7 +133,7 @@ class ConnectionPool:
         # If the '_closed' property is not set we probably failed in __init__.
         # Don't try anything complicated as probably it won't work.
         if hasattr(self, "_closed"):
-            self.close()
+            self.close(timeout=0)
 
     @contextmanager
     def connection(
@@ -317,12 +317,14 @@ class ConnectionPool:
         """`!True` if the pool is closed."""
         return self._closed
 
-    def close(self) -> None:
+    def close(self, timeout: float = 1.0) -> None:
         """Close the pool and make it unavailable to new clients.
 
         All the waiting and future client will fail to acquire a connection
         with a `PoolClosed` exception. Currently used connections will not be
         closed until returned to the pool.
+
+        Wait *timeout* for threads to terminate their job, if positive.
         """
         if self._closed:
             return
@@ -336,6 +338,10 @@ class ConnectionPool:
         # Stop the scheduler
         self.sched.enter(0, None)
 
+        # Stop the worker threads
+        for i in range(len(self._workers)):
+            self.add_task(StopWorker(self))
+
         # Signal to eventual clients in the queue that business is closed.
         while self._waiting:
             pos = self._waiting.popleft()
@@ -346,9 +352,19 @@ class ConnectionPool:
             conn = self._pool.pop()[0]
             conn.close()
 
-        # Stop the worker threads
-        for i in range(len(self._workers)):
-            self.add_task(StopWorker(self))
+        # Wait for the worker threads to terminate
+        if timeout > 0:
+            for t in [self._sched_runner] + self._workers:
+                if not t.is_alive():
+                    continue
+                t.join(timeout)
+                if t.is_alive():
+                    logger.warning(
+                        "couldn't stop thread %s in pool %r within %s seconds",
+                        t,
+                        self.name,
+                        timeout,
+                    )
 
     def add_task(self, task: "MaintenanceTask") -> None:
         """Add a task to the queue of tasts to perform."""
index 14e68c0ec98a335a083d3a6221055dc0e0188efb..cb6273a35b20111eef36ad27654394b83f8a2cf2 100644 (file)
@@ -308,6 +308,18 @@ def test_fail_rollback_close(dsn, caplog, monkeypatch):
     assert "BAD" in caplog.records[2].message
 
 
+def test_close_no_threads(dsn):
+    p = pool.ConnectionPool(dsn)
+    assert p._sched_runner.is_alive()
+    for t in p._workers:
+        assert t.is_alive()
+
+    p.close()
+    assert not p._sched_runner.is_alive()
+    for t in p._workers:
+        assert not t.is_alive()
+
+
 def test_putconn_no_pool(dsn):
     p = pool.ConnectionPool(dsn, minconn=1)
     conn = psycopg3.connect(dsn)
@@ -324,7 +336,7 @@ def test_putconn_wrong_pool(dsn):
 
 
 def test_del_no_warning(dsn, recwarn):
-    p = pool.ConnectionPool(minconn=2)
+    p = pool.ConnectionPool(dsn, minconn=2)
     with p.connection() as conn:
         conn.execute("select 1")
 
@@ -336,6 +348,16 @@ def test_del_no_warning(dsn, recwarn):
     assert not recwarn
 
 
+@pytest.mark.slow
+def test_del_stop_threads(dsn):
+    p = pool.ConnectionPool(dsn)
+    ts = [p._sched_runner] + p._workers
+    del p
+    sleep(0.2)
+    for t in ts:
+        assert not t.is_alive()
+
+
 def test_closed_getconn(dsn):
     p = pool.ConnectionPool(dsn, minconn=1)
     assert not p.closed