``psycopg`` release notes
=========================
+Future releases
+---------------
+
+Psycopg 3.1.17 (unreleased)
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- Use `typing.Self` as a more correct return value annotation of context
+ managers and other self-returning methods (see :ticket:`708`).
+
+
Current release
---------------
``psycopg_pool`` release notes
==============================
+Future releases
+---------------
+
+psycopg_pool 3.1.10 (unreleased)
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- Use `typing.Self` as a more correct return value annotation of context
+ managers and other self-returning methods (see :ticket:`708`).
+
+
Current release
---------------
from typing_extensions import TypeGuard
if sys.version_info >= (3, 11):
- from typing import LiteralString
+ from typing import LiteralString, Self
else:
- from typing_extensions import LiteralString
+ from typing_extensions import LiteralString, Self
__all__ = [
"Counter",
"Deque",
"LiteralString",
"Protocol",
+ "Self",
"TypeGuard",
"ZoneInfo",
"cache",
import logging
from types import TracebackType
-from typing import Any, List, Optional, Union, Tuple, Type, TypeVar, TYPE_CHECKING
+from typing import Any, List, Optional, Union, Tuple, Type, TYPE_CHECKING
from typing_extensions import TypeAlias
from . import pq
from . import errors as e
from .abc import PipelineCommand, PQGen
-from ._compat import Deque
+from ._compat import Deque, Self
from .pq.misc import connection_summary
from ._encodings import pgconn_encoding
from ._preparing import Key, Prepare
__module__ = "psycopg"
_conn: "Connection[Any]"
- _Self = TypeVar("_Self", bound="Pipeline")
def __init__(self, conn: "Connection[Any]") -> None:
super().__init__(conn)
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
- def __enter__(self: _Self) -> _Self:
+ def __enter__(self) -> Self:
with self._conn.lock:
self._conn.wait(self._enter_gen())
return self
__module__ = "psycopg"
_conn: "AsyncConnection[Any]"
- _Self = TypeVar("_Self", bound="AsyncPipeline")
def __init__(self, conn: "AsyncConnection[Any]") -> None:
super().__init__(conn)
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
- async def __aenter__(self: _Self) -> _Self:
+ async def __aenter__(self) -> Self:
async with self._conn.lock:
await self._conn.wait(self._enter_gen())
return self
from .adapt import AdaptersMap
from ._enums import IsolationLevel
from .cursor import Cursor
-from ._compat import LiteralString
+from ._compat import LiteralString, Self
from .pq.misc import connection_summary
from .conninfo import make_conninfo, conninfo_to_dict, ConnectionInfo
from .conninfo import conninfo_attempts, ConnDict, timeout_from_conninfo
server_cursor_factory: Type[ServerCursor[Row]]
row_factory: RowFactory[Row]
_pipeline: Optional[Pipeline]
- _Self = TypeVar("_Self", bound="Connection[Any]")
def __init__(
self,
context: Optional[AdaptContext] = None,
**kwargs: Union[None, int, str],
) -> "Connection[Row]":
- # TODO: returned type should be _Self. See #308.
+ # TODO: returned type should be Self. See #308.
+ # Unfortunately we cannot use Self[Row] as Self is not parametric.
+ # https://peps.python.org/pep-0673/#use-in-generic-classes
...
@overload
cursor_factory: Optional[Type[Cursor[Row]]] = None,
context: Optional[AdaptContext] = None,
**kwargs: Any,
- ) -> "Connection[Any]":
+ ) -> Self:
"""
Connect to a database server and return a new `Connection` instance.
"""
rv.prepare_threshold = prepare_threshold
return rv
- def __enter__(self: _Self) -> _Self:
+ def __enter__(self) -> Self:
return self
def __exit__(
import logging
from types import TracebackType
from typing import Any, AsyncGenerator, AsyncIterator, List, Optional
-from typing import Type, TypeVar, Union, cast, overload, TYPE_CHECKING
+from typing import Type, Union, cast, overload, TYPE_CHECKING
from contextlib import asynccontextmanager
from . import pq
from .rows import Row, AsyncRowFactory, tuple_row, TupleRow, args_row
from .adapt import AdaptersMap
from ._enums import IsolationLevel
+from ._compat import Self
from .conninfo import ConnDict, make_conninfo, conninfo_to_dict
from .conninfo import conninfo_attempts_async, timeout_from_conninfo
from ._pipeline import AsyncPipeline
server_cursor_factory: Type[AsyncServerCursor[Row]]
row_factory: AsyncRowFactory[Row]
_pipeline: Optional[AsyncPipeline]
- _Self = TypeVar("_Self", bound="AsyncConnection[Any]")
def __init__(
self,
context: Optional[AdaptContext] = None,
**kwargs: Union[None, int, str],
) -> "AsyncConnection[Row]":
- # TODO: returned type should be _Self. See #308.
+ # TODO: returned type should be Self. See #308.
+ # Unfortunately we cannot use Self[Row] as Self is not parametric.
+ # https://peps.python.org/pep-0673/#use-in-generic-classes
...
@overload
row_factory: Optional[AsyncRowFactory[Row]] = None,
cursor_factory: Optional[Type[AsyncCursor[Row]]] = None,
**kwargs: Any,
- ) -> "AsyncConnection[Any]":
+ ) -> Self:
if sys.platform == "win32":
loop = asyncio.get_running_loop()
if isinstance(loop, asyncio.ProactorEventLoop):
rv.prepare_threshold = prepare_threshold
return rv
- async def __aenter__(self: _Self) -> _Self:
+ async def __aenter__(self) -> Self:
return self
async def __aexit__(
from abc import ABC, abstractmethod
from types import TracebackType
from typing import Any, AsyncIterator, Dict, Generic, Iterator, List, Match, IO
-from typing import Optional, Sequence, Tuple, Type, TypeVar, Union, TYPE_CHECKING
+from typing import Optional, Sequence, Tuple, Type, Union, TYPE_CHECKING
from . import pq
from . import adapt
from . import errors as e
from .abc import Buffer, ConnectionType, PQGen, Transformer
-from ._compat import create_task
+from ._compat import create_task, Self
from .pq.misc import connection_summary
from ._cmodule import _psycopg
from ._encodings import pgconn_encoding
a file for later use.
"""
- _Self = TypeVar("_Self", bound="BaseCopy[Any]")
-
formatter: "Formatter"
def __init__(
self.writer = writer
self._write = writer.write
- def __enter__(self: BaseCopy._Self) -> BaseCopy._Self:
+ def __enter__(self) -> Self:
self._enter()
return self
self.writer = writer
self._write = writer.write
- async def __aenter__(self: BaseCopy._Self) -> BaseCopy._Self:
+ async def __aenter__(self) -> Self:
self._enter()
return self
from .. import errors as e
from ..abc import AdaptContext
from ..rows import Row, RowFactory, AsyncRowFactory, TupleRow
+from .._compat import Self
from ..conninfo import ConnectionInfo
from ..connection import Connection
from .._adapters_map import AdaptersMap
...
@classmethod
- def connect(cls, conninfo: str = "", **kwargs: Any) -> "CrdbConnection[Any]":
+ def connect(cls, conninfo: str = "", **kwargs: Any) -> Self:
"""
Connect to a database server and return a new `CrdbConnection` instance.
"""
...
@classmethod
- async def connect(
- cls, conninfo: str = "", **kwargs: Any
- ) -> "AsyncCrdbConnection[Any]":
- return await super().connect(conninfo, **kwargs) # type: ignore [no-any-return]
+ async def connect(cls, conninfo: str = "", **kwargs: Any) -> Self:
+ return await super().connect(conninfo, **kwargs) # type: ignore[no-any-return]
class CrdbConnectionInfo(ConnectionInfo):
from functools import partial
from types import TracebackType
from typing import Any, Generic, Iterable, Iterator, List
-from typing import Optional, NoReturn, Sequence, Tuple, Type, TypeVar
+from typing import Optional, NoReturn, Sequence, Tuple, Type
from typing import overload, TYPE_CHECKING
from warnings import warn
from contextlib import contextmanager
from .copy import Copy, Writer as CopyWriter
from .rows import Row, RowMaker, RowFactory
from ._column import Column
+from ._compat import Self
from .pq.misc import connection_summary
from ._queries import PostgresQuery, PostgresClientQuery
from ._pipeline import Pipeline
class Cursor(BaseCursor["Connection[Any]", Row]):
__module__ = "psycopg"
__slots__ = ()
- _Self = TypeVar("_Self", bound="Cursor[Any]")
@overload
def __init__(self: "Cursor[Row]", connection: "Connection[Row]"):
super().__init__(connection)
self._row_factory = row_factory or connection.row_factory
- def __enter__(self: _Self) -> _Self:
+ def __enter__(self) -> Self:
return self
def __exit__(
return self._row_factory(self)
def execute(
- self: _Self,
+ self,
query: Query,
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
binary: Optional[bool] = None,
- ) -> _Self:
+ ) -> Self:
"""
Execute a query or command to the database.
"""
from types import TracebackType
from typing import Any, AsyncIterator, Iterable, List
-from typing import Optional, Type, TypeVar, TYPE_CHECKING, overload
+from typing import Optional, Type, TYPE_CHECKING, overload
from contextlib import asynccontextmanager
from . import pq
from .copy import AsyncCopy, AsyncWriter as AsyncCopyWriter
from .rows import Row, RowMaker, AsyncRowFactory
from .cursor import BaseCursor
+from ._compat import Self
from ._pipeline import Pipeline
if TYPE_CHECKING:
class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
__module__ = "psycopg"
__slots__ = ()
- _Self = TypeVar("_Self", bound="AsyncCursor[Any]")
@overload
def __init__(self: "AsyncCursor[Row]", connection: "AsyncConnection[Row]"):
super().__init__(connection)
self._row_factory = row_factory or connection.row_factory
- async def __aenter__(self: _Self) -> _Self:
+ async def __aenter__(self) -> Self:
return self
async def __aexit__(
return self._row_factory(self)
async def execute(
- self: _Self,
+ self,
query: Query,
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
binary: Optional[bool] = None,
- ) -> _Self:
+ ) -> Self:
try:
async with self._conn.lock:
await self._conn.wait(
import inspect
import logging
-from typing import Any, Callable, Type, TypeVar, TYPE_CHECKING
+from typing import Any, Callable, TypeVar, TYPE_CHECKING
from functools import wraps
+from .._compat import Self
from . import PGconn
from .misc import connection_summary
class PGconnDebug:
"""Wrapper for a PQconn logging all its access."""
- _Self = TypeVar("_Self", bound="PGconnDebug")
_pgconn: "abc.PGconn"
def __init__(self, pgconn: "abc.PGconn"):
logger.info("PGconn.%s <- %s", attr, value)
@classmethod
- def connect(cls: Type[_Self], conninfo: bytes) -> _Self:
+ def connect(cls, conninfo: bytes) -> Self:
return cls(debugging(PGconn.connect)(conninfo))
@classmethod
- def connect_start(cls: Type[_Self], conninfo: bytes) -> _Self:
+ def connect_start(cls, conninfo: bytes) -> Self:
return cls(debugging(PGconn.connect_start)(conninfo))
@classmethod
# Copyright (C) 2020 The Psycopg Team
from typing import Any, AsyncIterator, List, Iterable, Iterator
-from typing import Optional, TypeVar, TYPE_CHECKING, overload
+from typing import Optional, TYPE_CHECKING, overload
from warnings import warn
from . import pq
from .abc import ConnectionType, Query, Params, PQGen
from .rows import Row, RowFactory, AsyncRowFactory
from .cursor import BaseCursor, Cursor
+from ._compat import Self
from .generators import execute
from .cursor_async import AsyncCursor
class ServerCursor(ServerCursorMixin["Connection[Any]", Row], Cursor[Row]):
__module__ = "psycopg"
__slots__ = ()
- _Self = TypeVar("_Self", bound="ServerCursor[Any]")
@overload
def __init__(
super().close()
def execute(
- self: _Self,
+ self,
query: Query,
params: Optional[Params] = None,
*,
binary: Optional[bool] = None,
**kwargs: Any,
- ) -> _Self:
+ ) -> Self:
"""
Open a cursor to execute a query to the database.
"""
):
__module__ = "psycopg"
__slots__ = ()
- _Self = TypeVar("_Self", bound="AsyncServerCursor[Any]")
@overload
def __init__(
await super().close()
async def execute(
- self: _Self,
+ self,
query: Query,
params: Optional[Params] = None,
*,
binary: Optional[bool] = None,
**kwargs: Any,
- ) -> _Self:
+ ) -> Self:
if kwargs:
raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
if self._pgconn.pipeline_status:
import logging
from types import TracebackType
-from typing import Generic, Iterator, Optional, Type, Union, TypeVar, TYPE_CHECKING
+from typing import Generic, Iterator, Optional, Type, Union, TYPE_CHECKING
from . import pq
from . import sql
from . import errors as e
from .abc import ConnectionType, PQGen
+from ._compat import Self
from .pq.misc import connection_summary
if TYPE_CHECKING:
__module__ = "psycopg"
- _Self = TypeVar("_Self", bound="Transaction")
-
@property
def connection(self) -> "Connection[Any]":
"""The connection the object is managing."""
return self._conn
- def __enter__(self: _Self) -> _Self:
+ def __enter__(self) -> Self:
with self._conn.lock:
self._conn.wait(self._enter_gen())
return self
__module__ = "psycopg"
- _Self = TypeVar("_Self", bound="AsyncTransaction")
-
@property
def connection(self) -> "AsyncConnection[Any]":
return self._conn
- async def __aenter__(self: _Self) -> _Self:
+ async def __aenter__(self) -> Self:
async with self._conn.lock:
await self._conn.wait(self._enter_gen())
return self
else:
from typing import Counter, Deque
+if sys.version_info >= (3, 11):
+ from typing import Self
+else:
+ from typing_extensions import Self
+
__all__ = [
"Counter",
"Deque",
+ "Self",
"Task",
"create_task",
]
from queue import Queue, Empty
from types import TracebackType
from typing import Any, Callable, Dict, Iterator, List
-from typing import Optional, Sequence, Type, TypeVar
+from typing import Optional, Sequence, Type
from weakref import ref
from contextlib import contextmanager
from .base import ConnectionAttempt, BasePool
from .sched import Scheduler
from .errors import PoolClosed, PoolTimeout, TooManyRequests
-from ._compat import Deque
+from ._compat import Deque, Self
logger = logging.getLogger("psycopg.pool")
class ConnectionPool(BasePool[Connection[Any]]):
- _Self = TypeVar("_Self", bound="ConnectionPool")
-
def __init__(
self,
conninfo: str = "",
timeout,
)
- def __enter__(self: _Self) -> _Self:
+ def __enter__(self) -> Self:
self.open()
return self
from time import monotonic
from types import TracebackType
from typing import Any, AsyncIterator, Awaitable, Callable
-from typing import Dict, List, Optional, Sequence, Type, TypeVar
+from typing import Dict, List, Optional, Sequence, Type
from weakref import ref
from contextlib import asynccontextmanager
from .base import ConnectionAttempt, BasePool
from .sched import AsyncScheduler
from .errors import PoolClosed, PoolTimeout, TooManyRequests
-from ._compat import Task, create_task, Deque
+from ._compat import Task, create_task, Deque, Self
logger = logging.getLogger("psycopg.pool")
class AsyncConnectionPool(BasePool[AsyncConnection[Any]]):
- _Self = TypeVar("_Self", bound="AsyncConnectionPool")
-
def __init__(
self,
conninfo: str = "",
timeout,
)
- async def __aenter__(self: _Self) -> _Self:
+ async def __aenter__(self) -> Self:
await self.open()
return self
packages = find:
zip_safe = False
install_requires =
- typing-extensions >= 3.10
+ typing-extensions >= 4.0
[options.package_data]
psycopg_pool = py.typed