From: Daniele Varrazzo Date: Sun, 21 Feb 2021 12:41:02 +0000 (+0100) Subject: Wait worker thread to stop on pool close X-Git-Tag: 3.0.dev0~87^2~54 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c9bbd9d3decf2af5aa02e23ffb34fa9750a96a85;p=thirdparty%2Fpsycopg.git Wait worker thread to stop on pool close --- diff --git a/psycopg3/psycopg3/pool.py b/psycopg3/psycopg3/pool.py index 9a83993b6..df333e650 100644 --- a/psycopg3/psycopg3/pool.py +++ b/psycopg3/psycopg3/pool.py @@ -133,7 +133,7 @@ class ConnectionPool: # If the '_closed' property is not set we probably failed in __init__. # Don't try anything complicated as probably it won't work. if hasattr(self, "_closed"): - self.close() + self.close(timeout=0) @contextmanager def connection( @@ -317,12 +317,14 @@ class ConnectionPool: """`!True` if the pool is closed.""" return self._closed - def close(self) -> None: + def close(self, timeout: float = 1.0) -> None: """Close the pool and make it unavailable to new clients. All the waiting and future client will fail to acquire a connection with a `PoolClosed` exception. Currently used connections will not be closed until returned to the pool. + + Wait *timeout* for threads to terminate their job, if positive. """ if self._closed: return @@ -336,6 +338,10 @@ class ConnectionPool: # Stop the scheduler self.sched.enter(0, None) + # Stop the worker threads + for i in range(len(self._workers)): + self.add_task(StopWorker(self)) + # Signal to eventual clients in the queue that business is closed. while self._waiting: pos = self._waiting.popleft() @@ -346,9 +352,19 @@ class ConnectionPool: conn = self._pool.pop()[0] conn.close() - # Stop the worker threads - for i in range(len(self._workers)): - self.add_task(StopWorker(self)) + # Wait for the worker threads to terminate + if timeout > 0: + for t in [self._sched_runner] + self._workers: + if not t.is_alive(): + continue + t.join(timeout) + if t.is_alive(): + logger.warning( + "couldn't stop thread %s in pool %r within %s seconds", + t, + self.name, + timeout, + ) def add_task(self, task: "MaintenanceTask") -> None: """Add a task to the queue of tasts to perform.""" diff --git a/tests/test_pool.py b/tests/test_pool.py index 14e68c0ec..cb6273a35 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -308,6 +308,18 @@ def test_fail_rollback_close(dsn, caplog, monkeypatch): assert "BAD" in caplog.records[2].message +def test_close_no_threads(dsn): + p = pool.ConnectionPool(dsn) + assert p._sched_runner.is_alive() + for t in p._workers: + assert t.is_alive() + + p.close() + assert not p._sched_runner.is_alive() + for t in p._workers: + assert not t.is_alive() + + def test_putconn_no_pool(dsn): p = pool.ConnectionPool(dsn, minconn=1) conn = psycopg3.connect(dsn) @@ -324,7 +336,7 @@ def test_putconn_wrong_pool(dsn): def test_del_no_warning(dsn, recwarn): - p = pool.ConnectionPool(minconn=2) + p = pool.ConnectionPool(dsn, minconn=2) with p.connection() as conn: conn.execute("select 1") @@ -336,6 +348,16 @@ def test_del_no_warning(dsn, recwarn): assert not recwarn +@pytest.mark.slow +def test_del_stop_threads(dsn): + p = pool.ConnectionPool(dsn) + ts = [p._sched_runner] + p._workers + del p + sleep(0.2) + for t in ts: + assert not t.is_alive() + + def test_closed_getconn(dsn): p = pool.ConnectionPool(dsn, minconn=1) assert not p.closed