from psycopg.abc import ConnectionType
from psycopg import errors as e
+from .errors import PoolClosed
from ._compat import Counter, Deque
"pool has already been opened/closed and cannot be reused"
)
+ def _check_open_getconn(self) -> None:
+ if self._closed:
+ if self._opened:
+ raise PoolClosed(
+ f"the pool {self.name!r} has already been closed"
+ )
+ else:
+ raise PoolClosed(
+ f"the pool {self.name!r} has not been opened yet"
+ )
+
def get_stats(self) -> Dict[str, int]:
"""
Return current stats about the pool usage.
# Critical section: decide here if there's a connection ready
# or if the client needs to wait.
with self._lock:
- if self._closed:
- raise PoolClosed(f"the pool {self.name!r} is closed")
+ self._check_open_getconn()
pos: Optional[WaitingClient] = None
if self._pool:
# Critical section: decide here if there's a connection ready
# or if the client needs to wait.
async with self._lock:
- if self._closed:
- raise PoolClosed(f"the pool {self.name!r} is closed")
+ self._check_open_getconn()
pos: Optional[AsyncClient] = None
if self._pool:
def test_open_explicit(dsn):
p = pool.ConnectionPool(dsn, open=False)
assert p.closed
- with pytest.raises(pool.PoolClosed):
+ with pytest.raises(pool.PoolClosed, match="has not been opened yet"):
p.getconn()
with pytest.raises(pool.PoolClosed):
finally:
p.close()
+ with pytest.raises(pool.PoolClosed, match="has already been closed"):
+ p.getconn()
+
def test_open_context(dsn):
p = pool.ConnectionPool(dsn, open=False)
with pytest.raises(pool.PoolClosed):
await p.getconn()
- with pytest.raises(pool.PoolClosed):
+ with pytest.raises(pool.PoolClosed, match="has not been opened yet"):
async with p.connection():
pass
finally:
await p.close()
+ with pytest.raises(pool.PoolClosed, match="has already been closed"):
+ await p.getconn()
+
async def test_open_context(dsn):
p = pool.AsyncConnectionPool(dsn, open=False)