``psycopg_pool`` release notes
==============================
+Future releases
+---------------
+
+psycopg_pool 3.2.1 (unreleased)
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- Use `typing.Self` as a more correct return value annotation of context
+ managers and other self-returning methods (see :ticket:`708`).
+
+
Current release
---------------
it will become an error. (:ticket:`#659`).
-psycopg_pool 3.1.9 (unreleased)
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+psycopg_pool 3.1.9
+^^^^^^^^^^^^^^^^^^
- Fix the return type annotation of `!NullConnectionPool.__enter__()`
(:ticket:`#540`).
else:
from typing import Counter, Deque
+if sys.version_info >= (3, 11):
+ from typing import Self
+else:
+ from typing_extensions import Self
+
__all__ = [
"Counter",
"Deque",
+ "Self",
]
# Workaround for psycopg < 3.0.8.
from time import monotonic
from types import TracebackType
from typing import Any, Iterator, cast, Dict, Generic, List
-from typing import Optional, overload, Sequence, Type, TypeVar
+from typing import Optional, overload, Sequence, Type
from weakref import ref
from contextlib import contextmanager
from .abc import CT, ConnectionCB, ConnectFailedCB
from .base import ConnectionAttempt, BasePool
from .errors import PoolClosed, PoolTimeout, TooManyRequests
-from ._compat import Deque
+from ._compat import Deque, Self
from ._acompat import Condition, Event, Lock, Queue, Worker, spawn, gather
from ._acompat import current_thread_name
from .sched import Scheduler
class ConnectionPool(Generic[CT], BasePool):
- _Self = TypeVar("_Self", bound="ConnectionPool[CT]")
_pool: Deque[CT]
@overload
sched_runner, self._sched_runner = (self._sched_runner, None)
gather(sched_runner, *workers, timeout=timeout)
- def __enter__(self: _Self) -> _Self:
+ def __enter__(self) -> Self:
self._open_implicit = False
self.open()
return self
from time import monotonic
from types import TracebackType
from typing import Any, AsyncIterator, cast, Dict, Generic, List
-from typing import Optional, overload, Sequence, Type, TypeVar
+from typing import Optional, overload, Sequence, Type
from weakref import ref
from contextlib import asynccontextmanager
from .abc import ACT, AsyncConnectionCB, AsyncConnectFailedCB
from .base import ConnectionAttempt, BasePool
from .errors import PoolClosed, PoolTimeout, TooManyRequests
-from ._compat import Deque
+from ._compat import Deque, Self
from ._acompat import ACondition, AEvent, ALock, AQueue, AWorker, aspawn, agather
from ._acompat import current_task_name
from .sched_async import AsyncScheduler
class AsyncConnectionPool(Generic[ACT], BasePool):
- _Self = TypeVar("_Self", bound="AsyncConnectionPool[ACT]")
_pool: Deque[ACT]
@overload
sched_runner, self._sched_runner = self._sched_runner, None
await agather(sched_runner, *workers, timeout=timeout)
- async def __aenter__(self: _Self) -> _Self:
+ async def __aenter__(self) -> Self:
self._open_implicit = False
await self.open()
return self
packages = find:
zip_safe = False
install_requires =
- typing-extensions >= 3.10
+ typing-extensions >= 4.0
[options.package_data]
psycopg_pool = py.typed