import asyncio
import logging
from time import monotonic
-from typing import Any, Awaitable, Callable, Deque, AsyncIterator, Optional
+from types import TracebackType
+from typing import Any, AsyncIterator, Awaitable, Callable, Deque
+from typing import Optional, Type
from collections import deque
from ..pq import TransactionStatus
timeout,
)
+ async def __aenter__(self) -> "AsyncConnectionPool":
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ await self.close()
+
async def resize(
self, minconn: int, maxconn: Optional[int] = None
) -> None:
import logging
import threading
from time import monotonic
-from typing import Any, Callable, Deque, Iterator, Optional
+from types import TracebackType
+from typing import Any, Callable, Deque, Iterator, Optional, Type
from contextlib import contextmanager
from collections import deque
timeout,
)
+ def __enter__(self) -> "ConnectionPool":
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ self.close()
+
def resize(self, minconn: int, maxconn: Optional[int] = None) -> None:
if maxconn is None:
maxconn = minconn
assert conn2.pgconn.backend_pid == pid
+def test_context(dsn):
+ with pool.ConnectionPool(dsn, minconn=1) as p:
+ assert not p.closed
+ assert p.closed
+
+
@pytest.mark.slow
def test_concurrent_filling(dsn, monkeypatch):
delay_connection(monkeypatch, 0.1)
await p.close()
+async def test_context(dsn):
+ async with pool.AsyncConnectionPool(dsn, minconn=1) as p:
+ assert not p.closed
+ assert p.closed
+
+
async def test_connection_not_lost(dsn):
p = pool.AsyncConnectionPool(dsn, minconn=1)
with pytest.raises(ZeroDivisionError):