self._sched.enter(0, None)
# Stop the worker threads
- for i in range(len(self._workers)):
+ workers, self._workers = self._workers[:], []
+ for i in range(len(workers)):
self.run_task(StopWorker(self))
# Signal to eventual clients in the queue that business is closed.
# Wait for the worker threads to terminate
assert self._sched_runner is not None
+ sched_runner, self._sched_runner = self._sched_runner, None
if timeout > 0:
- for t in [self._sched_runner] + self._workers:
+ for t in [sched_runner] + workers:
if not t.is_alive():
continue
t.join(timeout)
self.name,
timeout,
)
- self._sched_runner = None
def __enter__(self) -> "ConnectionPool":
return self
await self._sched.enter(0, None)
# Stop the worker tasks
- for w in self._workers:
+ workers, self._workers = self._workers[:], []
+ for w in workers:
self.run_task(StopWorker(self))
# Signal to eventual clients in the queue that business is closed.
# Wait for the worker tasks to terminate
assert self._sched_runner is not None
- wait = asyncio.gather(self._sched_runner, *self._workers)
+ sched_runner, self._sched_runner = self._sched_runner, None
+ wait = asyncio.gather(sched_runner, *workers)
try:
if timeout > 0:
await asyncio.wait_for(asyncio.shield(wait), timeout=timeout)
self.name,
timeout,
)
- self._sched_runner = None
async def __aenter__(self) -> "AsyncConnectionPool":
return self
def test_close_no_threads(dsn):
p = pool.ConnectionPool(dsn)
assert p._sched_runner and p._sched_runner.is_alive()
- for t in p._workers:
+ workers = p._workers[:]
+ assert workers
+ for t in workers:
assert t.is_alive()
p.close()
assert p._sched_runner is None
- for t in p._workers:
+ assert not p._workers
+ for t in workers:
assert not t.is_alive()
assert len(success) == 2
+def test_reopen(dsn):
+ p = pool.ConnectionPool(dsn)
+ with p.connection() as conn:
+ conn.execute("select 1")
+ p.close()
+ assert p._sched_runner is None
+ assert not p._workers
+ p.open()
+ assert p._sched_runner is not None
+ assert p._workers
+ with p.connection() as conn:
+ conn.execute("select 1")
+ p.close()
+
+
@pytest.mark.slow
@pytest.mark.timing
def test_grow(dsn, monkeypatch, retries):
async def test_close_no_tasks(dsn):
p = pool.AsyncConnectionPool(dsn)
assert p._sched_runner and not p._sched_runner.done()
- for t in p._workers:
+ assert p._workers
+ workers = p._workers[:]
+ for t in workers:
assert not t.done()
await p.close()
assert p._sched_runner is None
- for t in p._workers:
+ assert not p._workers
+ for t in workers:
assert t.done()
assert len(success) == 2
+async def test_reopen(dsn):
+ p = pool.AsyncConnectionPool(dsn)
+ async with p.connection() as conn:
+ await conn.execute("select 1")
+ await p.close()
+ assert p._sched_runner is None
+ p.open()
+ assert p._sched_runner is not None
+ async with p.connection() as conn:
+ await conn.execute("select 1")
+ await p.close()
+
+
@pytest.mark.slow
@pytest.mark.timing
async def test_grow(dsn, monkeypatch, retries):