# If the '_closed' property is not set we probably failed in __init__.
# Don't try anything complicated as probably it won't work.
if hasattr(self, "_closed"):
- self.close()
+ self.close(timeout=0)
@contextmanager
def connection(
"""`!True` if the pool is closed."""
return self._closed
- def close(self) -> None:
+ def close(self, timeout: float = 1.0) -> None:
"""Close the pool and make it unavailable to new clients.
All the waiting and future client will fail to acquire a connection
with a `PoolClosed` exception. Currently used connections will not be
closed until returned to the pool.
+
+ Wait *timeout* for threads to terminate their job, if positive.
"""
if self._closed:
return
# Stop the scheduler
self.sched.enter(0, None)
+ # Stop the worker threads
+ for i in range(len(self._workers)):
+ self.add_task(StopWorker(self))
+
# Signal to eventual clients in the queue that business is closed.
while self._waiting:
pos = self._waiting.popleft()
conn = self._pool.pop()[0]
conn.close()
- # Stop the worker threads
- for i in range(len(self._workers)):
- self.add_task(StopWorker(self))
+ # Wait for the worker threads to terminate
+ if timeout > 0:
+ for t in [self._sched_runner] + self._workers:
+ if not t.is_alive():
+ continue
+ t.join(timeout)
+ if t.is_alive():
+ logger.warning(
+ "couldn't stop thread %s in pool %r within %s seconds",
+ t,
+ self.name,
+ timeout,
+ )
def add_task(self, task: "MaintenanceTask") -> None:
"""Add a task to the queue of tasts to perform."""
assert "BAD" in caplog.records[2].message
+def test_close_no_threads(dsn):
+ p = pool.ConnectionPool(dsn)
+ assert p._sched_runner.is_alive()
+ for t in p._workers:
+ assert t.is_alive()
+
+ p.close()
+ assert not p._sched_runner.is_alive()
+ for t in p._workers:
+ assert not t.is_alive()
+
+
def test_putconn_no_pool(dsn):
p = pool.ConnectionPool(dsn, minconn=1)
conn = psycopg3.connect(dsn)
def test_del_no_warning(dsn, recwarn):
- p = pool.ConnectionPool(minconn=2)
+ p = pool.ConnectionPool(dsn, minconn=2)
with p.connection() as conn:
conn.execute("select 1")
assert not recwarn
+@pytest.mark.slow
+def test_del_stop_threads(dsn):
+ p = pool.ConnectionPool(dsn)
+ ts = [p._sched_runner] + p._workers
+ del p
+ sleep(0.2)
+ for t in ts:
+ assert not t.is_alive()
+
+
def test_closed_getconn(dsn):
p = pool.ConnectionPool(dsn, minconn=1)
assert not p.closed