- Add `ConnectionPool.open()` and `AsyncConnectionPool.open()`
(:ticket:`#155`).
+- Raise an `~psycopg.OperationalError` when trying to re-open a closed pool
+ (:ticket:`#155`).
psycopg_pool 3.0.2
^^^^^^^^^^^^^^^^^^
from typing import Any, Callable, Dict, Generic, Optional
from psycopg.abc import ConnectionType
+from psycopg import errors as e
from ._compat import Counter, Deque
# connections to the pool.
self._growing = False
+ self._opened = False
self._closed = True
def __repr__(self) -> str:
"""`!True` if the pool is closed."""
return self._closed
+ def _check_open(self) -> None:
+ if self._closed and self._opened:
+ raise e.OperationalError(
+ "pool has already been opened/closed and cannot be reused"
+ )
+
def get_stats(self) -> Dict[str, int]:
"""
Return current stats about the pool usage.
if not self._closed:
return
+ self._check_open()
+
self._sched_runner = threading.Thread(
target=self._sched.run, name=f"{self.name}-scheduler", daemon=True
)
self.schedule_task(ShrinkPool(self), self.max_idle)
self._closed = False
+ self._opened = True
def close(self, timeout: float = 5.0) -> None:
"""Close the pool and make it unavailable to new clients.
if not self._closed:
return
+ self._check_open()
+
self._sched_runner = create_task(
self._sched.run(), name=f"{self.name}-scheduler"
)
self.run_task(Schedule(self, ShrinkPool(self), self.max_idle))
self._closed = False
+ self._opened = True
async def close(self, timeout: float = 5.0) -> None:
if self._closed:
import pytest
import psycopg
+from psycopg.errors import OperationalError
from psycopg.pq import TransactionStatus
from psycopg._compat import Counter
p.close()
assert p._sched_runner is None
assert not p._workers
- p.open()
- assert p._sched_runner is not None
- assert p._workers
- with p.connection() as conn:
- conn.execute("select 1")
- p.close()
+
+ with pytest.raises(OperationalError, match="cannot be reused"):
+ p.open()
@pytest.mark.slow
import pytest
import psycopg
+from psycopg.errors import OperationalError
from psycopg.pq import TransactionStatus
from psycopg._compat import create_task, Counter
await conn.execute("select 1")
await p.close()
assert p._sched_runner is None
- p.open()
- assert p._sched_runner is not None
- async with p.connection() as conn:
- await conn.execute("select 1")
- await p.close()
+
+ with pytest.raises(OperationalError, match="cannot be reused"):
+ p.open()
@pytest.mark.slow