from typing_extensions import TypeGuard
if sys.version_info >= (3, 11):
- from typing import LiteralString
+ from typing import LiteralString, Self
else:
- from typing_extensions import LiteralString
+ from typing_extensions import LiteralString, Self
if sys.version_info >= (3, 13):
from typing import TypeVar
"Counter",
"Deque",
"LiteralString",
+ "Self",
"TypeGuard",
"TypeVar",
"ZoneInfo",
from . import pq
from . import errors as e
+from ._compat import Self
from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE
from .generators import copy_to, copy_end
from ._encodings import pgconn_encoding
self.writer = writer
self._write = writer.write
- def __enter__(self: BaseCopy._Self) -> BaseCopy._Self:
+ def __enter__(self) -> Self:
self._enter()
return self
from . import pq
from . import errors as e
+from ._compat import Self
from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE
from .generators import copy_to, copy_end
from ._encodings import pgconn_encoding
self.writer = writer
self._write = writer.write
- async def __aenter__(self: BaseCopy._Self) -> BaseCopy._Self:
+ async def __aenter__(self) -> Self:
self._enter()
return self
import struct
from abc import ABC, abstractmethod
from typing import Any, Dict, Generic, List, Match
-from typing import Optional, Sequence, Tuple, TypeVar, Union, TYPE_CHECKING
+from typing import Optional, Sequence, Tuple, Union, TYPE_CHECKING
from . import pq
from . import adapt
a file for later use.
"""
- _Self = TypeVar("_Self", bound="BaseCopy[Any]")
-
formatter: Formatter
def __init__(
import logging
from types import TracebackType
-from typing import Any, List, Optional, Union, Tuple, Type, TypeVar, TYPE_CHECKING
+from typing import Any, List, Optional, Union, Tuple, Type, TYPE_CHECKING
from typing_extensions import TypeAlias
from . import pq
from . import errors as e
from .abc import PipelineCommand, PQGen
-from ._compat import Deque
+from ._compat import Deque, Self
from .pq.misc import connection_summary
from ._encodings import pgconn_encoding
from ._preparing import Key, Prepare
__module__ = "psycopg"
_conn: "Connection[Any]"
- _Self = TypeVar("_Self", bound="Pipeline")
def __init__(self, conn: "Connection[Any]") -> None:
super().__init__(conn)
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
- def __enter__(self: _Self) -> _Self:
+ def __enter__(self) -> Self:
with self._conn.lock:
self._conn.wait(self._enter_gen())
return self
__module__ = "psycopg"
_conn: "AsyncConnection[Any]"
- _Self = TypeVar("_Self", bound="AsyncPipeline")
def __init__(self, conn: "AsyncConnection[Any]") -> None:
super().__init__(conn)
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
- async def __aenter__(self: _Self) -> _Self:
+ async def __aenter__(self) -> Self:
async with self._conn.lock:
await self._conn.wait(self._enter_gen())
return self
import logging
from types import TracebackType
from typing import Any, Generator, Iterator, Dict, List, Optional
-from typing import Type, TypeVar, Union, cast, overload, TYPE_CHECKING
+from typing import Type, Union, cast, overload, TYPE_CHECKING
from contextlib import contextmanager
from . import pq
from .rows import Row, RowFactory, tuple_row, TupleRow, args_row
from .adapt import AdaptersMap
from ._enums import IsolationLevel
+from ._compat import Self
from .conninfo import make_conninfo, conninfo_to_dict
from ._pipeline import Pipeline
from ._encodings import pgconn_encoding
server_cursor_factory: Type[ServerCursor[Row]]
row_factory: RowFactory[Row]
_pipeline: Optional[Pipeline]
- _Self = TypeVar("_Self", bound="Connection[Any]")
def __init__(
self,
context: Optional[AdaptContext] = None,
**kwargs: Union[None, int, str],
) -> Connection[Row]:
- # TODO: returned type should be _Self. See #308.
+ # TODO: returned type should be Self. See #308.
+ # Unfortunately we cannot use Self[Row] as Self is not parametric.
+ # https://peps.python.org/pep-0673/#use-in-generic-classes
...
@overload
row_factory: Optional[RowFactory[Row]] = None,
cursor_factory: Optional[Type[Cursor[Row]]] = None,
**kwargs: Any,
- ) -> Connection[Any]:
+ ) -> Self:
"""
Connect to a database server and return a new `Connection` instance.
"""
rv.prepare_threshold = prepare_threshold
return rv
- def __enter__(self: _Self) -> _Self:
+ def __enter__(self) -> Self:
return self
def __exit__(
import logging
from types import TracebackType
from typing import Any, AsyncGenerator, AsyncIterator, Dict, List, Optional
-from typing import Type, TypeVar, Union, cast, overload, TYPE_CHECKING
+from typing import Type, Union, cast, overload, TYPE_CHECKING
from contextlib import asynccontextmanager
from . import pq
from .rows import Row, AsyncRowFactory, tuple_row, TupleRow, args_row
from .adapt import AdaptersMap
from ._enums import IsolationLevel
+from ._compat import Self
from .conninfo import make_conninfo, conninfo_to_dict
from ._pipeline import AsyncPipeline
from ._encodings import pgconn_encoding
server_cursor_factory: Type[AsyncServerCursor[Row]]
row_factory: AsyncRowFactory[Row]
_pipeline: Optional[AsyncPipeline]
- _Self = TypeVar("_Self", bound="AsyncConnection[Any]")
def __init__(
self,
context: Optional[AdaptContext] = None,
**kwargs: Union[None, int, str],
) -> AsyncConnection[Row]:
- # TODO: returned type should be _Self. See #308.
+ # TODO: returned type should be Self. See #308.
+ # Unfortunately we cannot use Self[Row] as Self is not parametric.
+ # https://peps.python.org/pep-0673/#use-in-generic-classes
...
@overload
row_factory: Optional[AsyncRowFactory[Row]] = None,
cursor_factory: Optional[Type[AsyncCursor[Row]]] = None,
**kwargs: Any,
- ) -> AsyncConnection[Any]:
+ ) -> Self:
"""
Connect to a database server and return a new `AsyncConnection` instance.
"""
rv.prepare_threshold = prepare_threshold
return rv
- async def __aenter__(self: _Self) -> _Self:
+ async def __aenter__(self) -> Self:
return self
async def __aexit__(
from .. import errors as e
from ..abc import AdaptContext
from ..rows import Row, RowFactory, AsyncRowFactory, TupleRow
+from .._compat import Self
from ..conninfo import ConnectionInfo
from ..connection import Connection
from .._adapters_map import AdaptersMap
...
@classmethod
- def connect(cls, conninfo: str = "", **kwargs: Any) -> "CrdbConnection[Any]":
+ def connect(cls, conninfo: str = "", **kwargs: Any) -> Self:
"""
Connect to a database server and return a new `CrdbConnection` instance.
"""
...
@classmethod
- async def connect(
- cls, conninfo: str = "", **kwargs: Any
- ) -> "AsyncCrdbConnection[Any]":
- return await super().connect(conninfo, **kwargs) # type: ignore [no-any-return]
+ async def connect(cls, conninfo: str = "", **kwargs: Any) -> Self:
+ return await super().connect(conninfo, **kwargs) # type: ignore[no-any-return]
class CrdbConnectionInfo(ConnectionInfo):
from __future__ import annotations
from types import TracebackType
-from typing import Any, Iterator, Iterable, List, Optional, Type, TypeVar
+from typing import Any, Iterator, Iterable, List, Optional, Type
from typing import TYPE_CHECKING, overload
from contextlib import contextmanager
from .abc import Query, Params
from .copy import Copy, Writer
from .rows import Row, RowMaker, RowFactory
+from ._compat import Self
from ._pipeline import Pipeline
from ._cursor_base import BaseCursor
class Cursor(BaseCursor["Connection[Any]", Row]):
__module__ = "psycopg"
__slots__ = ()
- _Self = TypeVar("_Self", bound="Cursor[Any]")
@overload
def __init__(self: Cursor[Row], connection: Connection[Row]):
super().__init__(connection)
self._row_factory = row_factory or connection.row_factory
- def __enter__(self: _Self) -> _Self:
+ def __enter__(self) -> Self:
return self
def __exit__(
return self._row_factory(self)
def execute(
- self: _Self,
+ self,
query: Query,
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
binary: Optional[bool] = None,
- ) -> _Self:
+ ) -> Self:
"""
Execute a query or command to the database.
"""
from __future__ import annotations
from types import TracebackType
-from typing import Any, AsyncIterator, Iterable, List, Optional, Type, TypeVar
+from typing import Any, AsyncIterator, Iterable, List, Optional, Type
from typing import TYPE_CHECKING, overload
from contextlib import asynccontextmanager
from .abc import Query, Params
from .copy import AsyncCopy, AsyncWriter
from .rows import Row, RowMaker, AsyncRowFactory
+from ._compat import Self
from ._pipeline import Pipeline
from ._cursor_base import BaseCursor
class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
__module__ = "psycopg"
__slots__ = ()
- _Self = TypeVar("_Self", bound="AsyncCursor[Any]")
@overload
def __init__(self: AsyncCursor[Row], connection: AsyncConnection[Row]):
super().__init__(connection)
self._row_factory = row_factory or connection.row_factory
- async def __aenter__(self: _Self) -> _Self:
+ async def __aenter__(self) -> Self:
return self
async def __aexit__(
return self._row_factory(self)
async def execute(
- self: _Self,
+ self,
query: Query,
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
binary: Optional[bool] = None,
- ) -> _Self:
+ ) -> Self:
"""
Execute a query or command to the database.
"""
import inspect
import logging
-from typing import Any, Callable, Type, TYPE_CHECKING
+from typing import Any, Callable, TYPE_CHECKING
from functools import wraps
-from .._compat import TypeVar
+from .._compat import Self, TypeVar
from . import PGconn
from .misc import connection_summary
class PGconnDebug:
"""Wrapper for a PQconn logging all its access."""
- _Self = TypeVar("_Self", bound="PGconnDebug")
_pgconn: "abc.PGconn"
def __init__(self, pgconn: "abc.PGconn"):
logger.info("PGconn.%s <- %s", attr, value)
@classmethod
- def connect(cls: Type[_Self], conninfo: bytes) -> _Self:
+ def connect(cls, conninfo: bytes) -> Self:
return cls(debugging(PGconn.connect)(conninfo))
@classmethod
- def connect_start(cls: Type[_Self], conninfo: bytes) -> _Self:
+ def connect_start(cls, conninfo: bytes) -> Self:
return cls(debugging(PGconn.connect_start)(conninfo))
@classmethod
# Copyright (C) 2020 The Psycopg Team
from typing import Any, AsyncIterator, List, Iterable, Iterator
-from typing import Optional, TypeVar, TYPE_CHECKING, overload
+from typing import Optional, TYPE_CHECKING, overload
from warnings import warn
from . import pq
from .abc import ConnectionType, Query, Params, PQGen
from .rows import Row, RowFactory, AsyncRowFactory
from .cursor import Cursor
+from ._compat import Self
from .generators import execute
from ._cursor_base import BaseCursor
from .cursor_async import AsyncCursor
class ServerCursor(ServerCursorMixin["Connection[Any]", Row], Cursor[Row]):
__module__ = "psycopg"
__slots__ = ()
- _Self = TypeVar("_Self", bound="ServerCursor[Any]")
@overload
def __init__(
super().close()
def execute(
- self: _Self,
+ self,
query: Query,
params: Optional[Params] = None,
*,
binary: Optional[bool] = None,
**kwargs: Any,
- ) -> _Self:
+ ) -> Self:
"""
Open a cursor to execute a query to the database.
"""
):
__module__ = "psycopg"
__slots__ = ()
- _Self = TypeVar("_Self", bound="AsyncServerCursor[Any]")
@overload
def __init__(
await super().close()
async def execute(
- self: _Self,
+ self,
query: Query,
params: Optional[Params] = None,
*,
binary: Optional[bool] = None,
**kwargs: Any,
- ) -> _Self:
+ ) -> Self:
if kwargs:
raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
if self._pgconn.pipeline_status:
import logging
from types import TracebackType
-from typing import Generic, Iterator, Optional, Type, Union, TypeVar, TYPE_CHECKING
+from typing import Generic, Iterator, Optional, Type, Union, TYPE_CHECKING
from . import pq
from . import sql
from . import errors as e
from .abc import ConnectionType, PQGen
+from ._compat import Self
from .pq.misc import connection_summary
if TYPE_CHECKING:
__module__ = "psycopg"
- _Self = TypeVar("_Self", bound="Transaction")
-
@property
def connection(self) -> "Connection[Any]":
"""The connection the object is managing."""
return self._conn
- def __enter__(self: _Self) -> _Self:
+ def __enter__(self) -> Self:
with self._conn.lock:
self._conn.wait(self._enter_gen())
return self
__module__ = "psycopg"
- _Self = TypeVar("_Self", bound="AsyncTransaction")
-
@property
def connection(self) -> "AsyncConnection[Any]":
return self._conn
- async def __aenter__(self: _Self) -> _Self:
+ async def __aenter__(self) -> Self:
async with self._conn.lock:
await self._conn.wait(self._enter_gen())
return self