Standardize the var name to _Self, waiting for PEP 0673.
import logging
from types import TracebackType
-from typing import Any, List, Optional, Union, Tuple, Type, TYPE_CHECKING
+from typing import Any, List, Optional, Union, Tuple, Type, TypeVar, TYPE_CHECKING
from . import pq
from . import errors as e
__module__ = "psycopg"
_conn: "Connection[Any]"
+ _Self = TypeVar("_Self", bound="Pipeline")
def __init__(self, conn: "Connection[Any]") -> None:
super().__init__(conn)
except e.Error as ex:
raise ex.with_traceback(None)
- def __enter__(self) -> "Pipeline":
+ def __enter__(self: _Self) -> _Self:
with self._conn.lock:
self._conn.wait(self._enter_gen())
return self
__module__ = "psycopg"
_conn: "AsyncConnection[Any]"
+ _Self = TypeVar("_Self", bound="AsyncPipeline")
def __init__(self, conn: "AsyncConnection[Any]") -> None:
super().__init__(conn)
except e.Error as ex:
raise ex.with_traceback(None)
- async def __aenter__(self) -> "AsyncPipeline":
+ async def __aenter__(self: _Self) -> _Self:
async with self._conn.lock:
await self._conn.wait(self._enter_gen())
return self
cursor_factory: Type[Cursor[Row]]
server_cursor_factory: Type[ServerCursor[Row]]
row_factory: RowFactory[Row]
-
_pipeline: Optional[Pipeline]
+ _Self = TypeVar("_Self", bound="Connection[Row]")
def __init__(
self,
context: Optional[AdaptContext] = None,
**kwargs: Union[None, int, str],
) -> "Connection[Row]":
+ # TODO: returned type should be _Self. See #308.
...
@overload
rv.prepare_threshold = prepare_threshold
return rv
- def __enter__(self) -> "Connection[Row]":
+ def __enter__(self: _Self) -> _Self:
return self
def __exit__(
import logging
from types import TracebackType
from typing import Any, AsyncGenerator, AsyncIterator, Dict, List, Optional
-from typing import Type, Union, cast, overload, TYPE_CHECKING
+from typing import Type, TypeVar, Union, cast, overload, TYPE_CHECKING
from contextlib import asynccontextmanager
from . import pq
server_cursor_factory: Type[AsyncServerCursor[Row]]
row_factory: AsyncRowFactory[Row]
_pipeline: Optional[AsyncPipeline]
+ _Self = TypeVar("_Self", bound="AsyncConnection[Row]")
def __init__(
self,
context: Optional[AdaptContext] = None,
**kwargs: Union[None, int, str],
) -> "AsyncConnection[Row]":
+ # TODO: returned type should be _Self. See #308.
...
@overload
rv.prepare_threshold = prepare_threshold
return rv
- async def __aenter__(self) -> "AsyncConnection[Row]":
+ async def __aenter__(self: _Self) -> _Self:
return self
async def __aexit__(
import threading
from abc import ABC, abstractmethod
from types import TracebackType
-from typing import TYPE_CHECKING, AsyncIterator, Iterator, Generic, Union
-from typing import Any, Dict, List, Match, Optional, Sequence, Type, Tuple
+from typing import Any, AsyncIterator, Dict, Generic, Iterator, List, Match
+from typing import Optional, Sequence, Tuple, Type, TypeVar, Union, TYPE_CHECKING
from . import pq
from . import errors as e
formatting the data in copy format and adding it to the queue.
"""
+ _Self = TypeVar("_Self", bound="BaseCopy[ConnectionType]")
+
# Max size of the write queue of buffers. More than that copy will block
# Each buffer should be around BUFFER_SIZE size.
QUEUE_SIZE = 1024
self._worker: Optional[threading.Thread] = None
self._worker_error: Optional[BaseException] = None
- def __enter__(self) -> "Copy":
+ def __enter__(self: BaseCopy._Self) -> BaseCopy._Self:
self._enter()
return self
self._queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=self.QUEUE_SIZE)
self._worker: Optional[asyncio.Future[None]] = None
- async def __aenter__(self) -> "AsyncCopy":
+ async def __aenter__(self: BaseCopy._Self) -> BaseCopy._Self:
self._enter()
return self
fetch = generators.fetch
send = generators.send
-_C = TypeVar("_C", bound="Cursor[Any]")
-
TEXT = pq.Format.TEXT
BINARY = pq.Format.BINARY
class Cursor(BaseCursor["Connection[Any]", Row]):
__module__ = "psycopg"
__slots__ = ()
+ _Self = TypeVar("_Self", bound="Cursor[Row]")
@overload
def __init__(self: "Cursor[Row]", connection: "Connection[Row]"):
super().__init__(connection)
self._row_factory = row_factory or connection.row_factory
- def __enter__(self: _C) -> _C:
+ def __enter__(self: _Self) -> _Self:
return self
def __exit__(
return self._row_factory(self)
def execute(
- self: _C,
+ self: _Self,
query: Query,
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
binary: Optional[bool] = None,
- ) -> _C:
+ ) -> _Self:
"""
Execute a query or command to the database.
"""
if TYPE_CHECKING:
from .connection_async import AsyncConnection
-_C = TypeVar("_C", bound="AsyncCursor[Any]")
-
class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
__module__ = "psycopg"
__slots__ = ()
+ _Self = TypeVar("_Self", bound="AsyncCursor[Row]")
@overload
def __init__(self: "AsyncCursor[Row]", connection: "AsyncConnection[Row]"):
super().__init__(connection)
self._row_factory = row_factory or connection.row_factory
- async def __aenter__(self: _C) -> _C:
+ async def __aenter__(self: _Self) -> _Self:
return self
async def __aexit__(
return self._row_factory(self)
async def execute(
- self: _C,
+ self: _Self,
query: Query,
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
binary: Optional[bool] = None,
- ) -> _C:
+ ) -> _Self:
try:
async with self._conn.lock:
await self._conn.wait(
return sql.SQL(" ").join(parts)
-_C = TypeVar("_C", bound="ServerCursor[Any]")
-_AC = TypeVar("_AC", bound="AsyncServerCursor[Any]")
-
-
class ServerCursor(ServerCursorMixin["Connection[Any]", Row], Cursor[Row]):
__module__ = "psycopg"
__slots__ = ()
+ _Self = TypeVar("_Self", bound="ServerCursor[Row]")
@overload
def __init__(
super().close()
def execute(
- self: _C,
+ self: _Self,
query: Query,
params: Optional[Params] = None,
*,
binary: Optional[bool] = None,
**kwargs: Any,
- ) -> _C:
+ ) -> _Self:
"""
Open a cursor to execute a query to the database.
"""
):
__module__ = "psycopg"
__slots__ = ()
+ _Self = TypeVar("_Self", bound="AsyncServerCursor[Row]")
@overload
def __init__(
await super().close()
async def execute(
- self: _AC,
+ self: _Self,
query: Query,
params: Optional[Params] = None,
*,
binary: Optional[bool] = None,
**kwargs: Any,
- ) -> _AC:
+ ) -> _Self:
if kwargs:
raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
if self._pgconn.pipeline_status:
import logging
from types import TracebackType
-from typing import Generic, Iterator, Optional, Type, Union, TYPE_CHECKING
+from typing import Generic, Iterator, Optional, Type, Union, TypeVar, TYPE_CHECKING
from . import pq
from . import sql
__module__ = "psycopg"
+ _Self = TypeVar("_Self", bound="Transaction")
+
@property
def connection(self) -> "Connection[Any]":
"""The connection the object is managing."""
return self._conn
- def __enter__(self) -> "Transaction":
+ def __enter__(self: _Self) -> _Self:
with self._conn.lock:
self._conn.wait(self._enter_gen())
return self
__module__ = "psycopg"
+ _Self = TypeVar("_Self", bound="AsyncTransaction")
+
@property
def connection(self) -> "AsyncConnection[Any]":
return self._conn
- async def __aenter__(self) -> "AsyncTransaction":
+ async def __aenter__(self: _Self) -> _Self:
async with self._conn.lock:
await self._conn.wait(self._enter_gen())
return self