.. __: https://numpy.org/doc/stable/reference/arrays.scalars.html#built-in-scalar-types
+Psycopg 3.1.17 (unreleased)
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- Use `typing.Self` as a more correct return value annotation of context
+ managers and other self-returning methods (see :ticket:`708`).
+
+
Current release
---------------
``psycopg_pool`` release notes
==============================
+Future releases
+---------------
+
+psycopg_pool 3.2.1 (unreleased)
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- Use `typing.Self` as a more correct return value annotation of context
+ managers and other self-returning methods (see :ticket:`708`).
+
+
Current release
---------------
from typing_extensions import TypeGuard
if sys.version_info >= (3, 11):
- from typing import LiteralString
+ from typing import LiteralString, Self
else:
- from typing_extensions import LiteralString
+ from typing_extensions import LiteralString, Self
__all__ = [
"Counter",
"Deque",
"LiteralString",
+ "Self",
"TypeGuard",
"ZoneInfo",
"cache",
from . import pq
from . import errors as e
+from ._compat import Self
from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE
from .generators import copy_to, copy_end
from ._encodings import pgconn_encoding
self.writer = writer
self._write = writer.write
- def __enter__(self: BaseCopy._Self) -> BaseCopy._Self:
+ def __enter__(self) -> Self:
self._enter()
return self
from . import pq
from . import errors as e
+from ._compat import Self
from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE
from .generators import copy_to, copy_end
from ._encodings import pgconn_encoding
self.writer = writer
self._write = writer.write
- async def __aenter__(self: BaseCopy._Self) -> BaseCopy._Self:
+ async def __aenter__(self) -> Self:
self._enter()
return self
import struct
from abc import ABC, abstractmethod
from typing import Any, Dict, Generic, List, Match
-from typing import Optional, Sequence, Tuple, TypeVar, Union, TYPE_CHECKING
+from typing import Optional, Sequence, Tuple, Union, TYPE_CHECKING
from . import pq
from . import adapt
a file for later use.
"""
- _Self = TypeVar("_Self", bound="BaseCopy[Any]")
-
formatter: Formatter
def __init__(
import logging
from types import TracebackType
-from typing import Any, List, Optional, Union, Tuple, Type, TypeVar, TYPE_CHECKING
+from typing import Any, List, Optional, Union, Tuple, Type, TYPE_CHECKING
from typing_extensions import TypeAlias
from . import pq
from . import errors as e
from .abc import PipelineCommand, PQGen
-from ._compat import Deque
+from ._compat import Deque, Self
from .pq.misc import connection_summary
from ._encodings import pgconn_encoding
from ._preparing import Key, Prepare
__module__ = "psycopg"
_conn: "Connection[Any]"
- _Self = TypeVar("_Self", bound="Pipeline")
def __init__(self, conn: "Connection[Any]") -> None:
super().__init__(conn)
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
- def __enter__(self: _Self) -> _Self:
+ def __enter__(self) -> Self:
with self._conn.lock:
self._conn.wait(self._enter_gen())
return self
__module__ = "psycopg"
_conn: "AsyncConnection[Any]"
- _Self = TypeVar("_Self", bound="AsyncPipeline")
def __init__(self, conn: "AsyncConnection[Any]") -> None:
super().__init__(conn)
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
- async def __aenter__(self: _Self) -> _Self:
+ async def __aenter__(self) -> Self:
async with self._conn.lock:
await self._conn.wait(self._enter_gen())
return self
import logging
from types import TracebackType
from typing import Any, Generator, Iterator, List, Optional
-from typing import Type, TypeVar, Union, cast, overload, TYPE_CHECKING
+from typing import Type, Union, cast, overload, TYPE_CHECKING
from contextlib import contextmanager
from . import pq
from .rows import Row, RowFactory, tuple_row, TupleRow, args_row
from .adapt import AdaptersMap
from ._enums import IsolationLevel
+from ._compat import Self
from .conninfo import ConnDict, make_conninfo, conninfo_to_dict
from .conninfo import conninfo_attempts, timeout_from_conninfo
from ._pipeline import Pipeline
server_cursor_factory: Type[ServerCursor[Row]]
row_factory: RowFactory[Row]
_pipeline: Optional[Pipeline]
- _Self = TypeVar("_Self", bound="Connection[Any]")
def __init__(
self,
context: Optional[AdaptContext] = None,
**kwargs: Union[None, int, str],
) -> Connection[Row]:
- # TODO: returned type should be _Self. See #308.
+ # TODO: returned type should be Self. See #308.
+ # Unfortunately we cannot use Self[Row] as Self is not parametric.
+ # https://peps.python.org/pep-0673/#use-in-generic-classes
...
@overload
row_factory: Optional[RowFactory[Row]] = None,
cursor_factory: Optional[Type[Cursor[Row]]] = None,
**kwargs: Any,
- ) -> Connection[Any]:
+ ) -> Self:
"""
Connect to a database server and return a new `Connection` instance.
"""
rv.prepare_threshold = prepare_threshold
return rv
- def __enter__(self: _Self) -> _Self:
+ def __enter__(self) -> Self:
return self
def __exit__(
import logging
from types import TracebackType
from typing import Any, AsyncGenerator, AsyncIterator, List, Optional
-from typing import Type, TypeVar, Union, cast, overload, TYPE_CHECKING
+from typing import Type, Union, cast, overload, TYPE_CHECKING
from contextlib import asynccontextmanager
from . import pq
from .rows import Row, AsyncRowFactory, tuple_row, TupleRow, args_row
from .adapt import AdaptersMap
from ._enums import IsolationLevel
+from ._compat import Self
from .conninfo import ConnDict, make_conninfo, conninfo_to_dict
from .conninfo import conninfo_attempts_async, timeout_from_conninfo
from ._pipeline import AsyncPipeline
server_cursor_factory: Type[AsyncServerCursor[Row]]
row_factory: AsyncRowFactory[Row]
_pipeline: Optional[AsyncPipeline]
- _Self = TypeVar("_Self", bound="AsyncConnection[Any]")
def __init__(
self,
context: Optional[AdaptContext] = None,
**kwargs: Union[None, int, str],
) -> AsyncConnection[Row]:
- # TODO: returned type should be _Self. See #308.
+ # TODO: returned type should be Self. See #308.
+ # Unfortunately we cannot use Self[Row] as Self is not parametric.
+ # https://peps.python.org/pep-0673/#use-in-generic-classes
...
@overload
row_factory: Optional[AsyncRowFactory[Row]] = None,
cursor_factory: Optional[Type[AsyncCursor[Row]]] = None,
**kwargs: Any,
- ) -> AsyncConnection[Any]:
+ ) -> Self:
"""
Connect to a database server and return a new `AsyncConnection` instance.
"""
rv.prepare_threshold = prepare_threshold
return rv
- async def __aenter__(self: _Self) -> _Self:
+ async def __aenter__(self) -> Self:
return self
async def __aexit__(
from .. import errors as e
from ..abc import AdaptContext
from ..rows import Row, RowFactory, AsyncRowFactory, TupleRow
+from .._compat import Self
from ..conninfo import ConnectionInfo
from ..connection import Connection
from .._adapters_map import AdaptersMap
...
@classmethod
- def connect(cls, conninfo: str = "", **kwargs: Any) -> "CrdbConnection[Any]":
+ def connect(cls, conninfo: str = "", **kwargs: Any) -> Self:
"""
Connect to a database server and return a new `CrdbConnection` instance.
"""
...
@classmethod
- async def connect(
- cls, conninfo: str = "", **kwargs: Any
- ) -> "AsyncCrdbConnection[Any]":
- return await super().connect(conninfo, **kwargs) # type: ignore [no-any-return]
+ async def connect(cls, conninfo: str = "", **kwargs: Any) -> Self:
+ return await super().connect(conninfo, **kwargs) # type: ignore[no-any-return]
class CrdbConnectionInfo(ConnectionInfo):
from __future__ import annotations
from types import TracebackType
-from typing import Any, Iterator, Iterable, List, Optional, Type, TypeVar
+from typing import Any, Iterator, Iterable, List, Optional, Type
from typing import TYPE_CHECKING, overload
from contextlib import contextmanager
from .abc import Query, Params
from .copy import Copy, Writer
from .rows import Row, RowMaker, RowFactory
+from ._compat import Self
from ._pipeline import Pipeline
from ._cursor_base import BaseCursor
class Cursor(BaseCursor["Connection[Any]", Row]):
__module__ = "psycopg"
__slots__ = ()
- _Self = TypeVar("_Self", bound="Cursor[Any]")
@overload
def __init__(self: Cursor[Row], connection: Connection[Row]):
super().__init__(connection)
self._row_factory = row_factory or connection.row_factory
- def __enter__(self: _Self) -> _Self:
+ def __enter__(self) -> Self:
return self
def __exit__(
return self._row_factory(self)
def execute(
- self: _Self,
+ self,
query: Query,
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
binary: Optional[bool] = None,
- ) -> _Self:
+ ) -> Self:
"""
Execute a query or command to the database.
"""
from __future__ import annotations
from types import TracebackType
-from typing import Any, AsyncIterator, Iterable, List, Optional, Type, TypeVar
+from typing import Any, AsyncIterator, Iterable, List, Optional, Type
from typing import TYPE_CHECKING, overload
from contextlib import asynccontextmanager
from .abc import Query, Params
from .copy import AsyncCopy, AsyncWriter
from .rows import Row, RowMaker, AsyncRowFactory
+from ._compat import Self
from ._pipeline import Pipeline
from ._cursor_base import BaseCursor
class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
__module__ = "psycopg"
__slots__ = ()
- _Self = TypeVar("_Self", bound="AsyncCursor[Any]")
@overload
def __init__(self: AsyncCursor[Row], connection: AsyncConnection[Row]):
super().__init__(connection)
self._row_factory = row_factory or connection.row_factory
- async def __aenter__(self: _Self) -> _Self:
+ async def __aenter__(self) -> Self:
return self
async def __aexit__(
return self._row_factory(self)
async def execute(
- self: _Self,
+ self,
query: Query,
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
binary: Optional[bool] = None,
- ) -> _Self:
+ ) -> Self:
"""
Execute a query or command to the database.
"""
import inspect
import logging
-from typing import Any, Callable, Type, TypeVar, TYPE_CHECKING
+from typing import Any, Callable, TypeVar, TYPE_CHECKING
from functools import wraps
+from .._compat import Self
from . import PGconn
from .misc import connection_summary
class PGconnDebug:
"""Wrapper for a PQconn logging all its access."""
- _Self = TypeVar("_Self", bound="PGconnDebug")
_pgconn: "abc.PGconn"
def __init__(self, pgconn: "abc.PGconn"):
logger.info("PGconn.%s <- %s", attr, value)
@classmethod
- def connect(cls: Type[_Self], conninfo: bytes) -> _Self:
+ def connect(cls, conninfo: bytes) -> Self:
return cls(debugging(PGconn.connect)(conninfo))
@classmethod
- def connect_start(cls: Type[_Self], conninfo: bytes) -> _Self:
+ def connect_start(cls, conninfo: bytes) -> Self:
return cls(debugging(PGconn.connect_start)(conninfo))
@classmethod
# Copyright (C) 2020 The Psycopg Team
from typing import Any, AsyncIterator, List, Iterable, Iterator
-from typing import Optional, TypeVar, TYPE_CHECKING, overload
+from typing import Optional, TYPE_CHECKING, overload
from warnings import warn
from . import pq
from .abc import ConnectionType, Query, Params, PQGen
from .rows import Row, RowFactory, AsyncRowFactory
from .cursor import Cursor
+from ._compat import Self
from .generators import execute
from ._cursor_base import BaseCursor
from .cursor_async import AsyncCursor
class ServerCursor(ServerCursorMixin["Connection[Any]", Row], Cursor[Row]):
__module__ = "psycopg"
__slots__ = ()
- _Self = TypeVar("_Self", bound="ServerCursor[Any]")
@overload
def __init__(
super().close()
def execute(
- self: _Self,
+ self,
query: Query,
params: Optional[Params] = None,
*,
binary: Optional[bool] = None,
**kwargs: Any,
- ) -> _Self:
+ ) -> Self:
"""
Open a cursor to execute a query to the database.
"""
):
__module__ = "psycopg"
__slots__ = ()
- _Self = TypeVar("_Self", bound="AsyncServerCursor[Any]")
@overload
def __init__(
await super().close()
async def execute(
- self: _Self,
+ self,
query: Query,
params: Optional[Params] = None,
*,
binary: Optional[bool] = None,
**kwargs: Any,
- ) -> _Self:
+ ) -> Self:
if kwargs:
raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
if self._pgconn.pipeline_status:
import logging
from types import TracebackType
-from typing import Generic, Iterator, Optional, Type, Union, TypeVar, TYPE_CHECKING
+from typing import Generic, Iterator, Optional, Type, Union, TYPE_CHECKING
from . import pq
from . import sql
from . import errors as e
from .abc import ConnectionType, PQGen
+from ._compat import Self
from .pq.misc import connection_summary
if TYPE_CHECKING:
__module__ = "psycopg"
- _Self = TypeVar("_Self", bound="Transaction")
-
@property
def connection(self) -> "Connection[Any]":
"""The connection the object is managing."""
return self._conn
- def __enter__(self: _Self) -> _Self:
+ def __enter__(self) -> Self:
with self._conn.lock:
self._conn.wait(self._enter_gen())
return self
__module__ = "psycopg"
- _Self = TypeVar("_Self", bound="AsyncTransaction")
-
@property
def connection(self) -> "AsyncConnection[Any]":
return self._conn
- async def __aenter__(self: _Self) -> _Self:
+ async def __aenter__(self) -> Self:
async with self._conn.lock:
await self._conn.wait(self._enter_gen())
return self
else:
from typing import Counter, Deque
+if sys.version_info >= (3, 11):
+ from typing import Self
+else:
+ from typing_extensions import Self
+
__all__ = [
"Counter",
"Deque",
+ "Self",
]
# Workaround for psycopg < 3.0.8.
from time import monotonic
from types import TracebackType
from typing import Any, Iterator, cast, Dict, Generic, List
-from typing import Optional, overload, Sequence, Type, TypeVar
+from typing import Optional, overload, Sequence, Type
from weakref import ref
from contextlib import contextmanager
from .abc import CT, ConnectionCB, ConnectFailedCB
from .base import ConnectionAttempt, BasePool
from .errors import PoolClosed, PoolTimeout, TooManyRequests
-from ._compat import Deque
+from ._compat import Deque, Self
from ._acompat import Condition, Event, Lock, Queue, Worker, spawn, gather
from ._acompat import current_thread_name
from .sched import Scheduler
class ConnectionPool(Generic[CT], BasePool):
- _Self = TypeVar("_Self", bound="ConnectionPool[CT]")
_pool: Deque[CT]
@overload
sched_runner, self._sched_runner = (self._sched_runner, None)
gather(sched_runner, *workers, timeout=timeout)
- def __enter__(self: _Self) -> _Self:
+ def __enter__(self) -> Self:
self._open_implicit = False
self.open()
return self
from time import monotonic
from types import TracebackType
from typing import Any, AsyncIterator, cast, Dict, Generic, List
-from typing import Optional, overload, Sequence, Type, TypeVar
+from typing import Optional, overload, Sequence, Type
from weakref import ref
from contextlib import asynccontextmanager
from .abc import ACT, AsyncConnectionCB, AsyncConnectFailedCB
from .base import ConnectionAttempt, BasePool
from .errors import PoolClosed, PoolTimeout, TooManyRequests
-from ._compat import Deque
+from ._compat import Deque, Self
from ._acompat import ACondition, AEvent, ALock, AQueue, AWorker, aspawn, agather
from ._acompat import current_task_name
from .sched_async import AsyncScheduler
class AsyncConnectionPool(Generic[ACT], BasePool):
- _Self = TypeVar("_Self", bound="AsyncConnectionPool[ACT]")
_pool: Deque[ACT]
@overload
sched_runner, self._sched_runner = self._sched_runner, None
await agather(sched_runner, *workers, timeout=timeout)
- async def __aenter__(self: _Self) -> _Self:
+ async def __aenter__(self) -> Self:
self._open_implicit = False
await self.open()
return self
packages = find:
zip_safe = False
install_requires =
- typing-extensions >= 3.10
+ typing-extensions >= 4.0
[options.package_data]
psycopg_pool = py.typed