[flake8]
max-line-length = 88
-ignore = W503, E203
+ignore = W503, E203, E704
extend-exclude = .venv build
per-file-ignores =
# Autogenerated section
[flake8]
max-line-length = 88
-ignore = W503, E203
+ignore = W503, E203, E704
per-file-ignores =
# Autogenerated section
psycopg/errors.py: E125, E128, E302
class UnpackLen(Protocol):
- def __call__(self, data: Buffer, start: Optional[int]) -> Tuple[int]:
- ...
+ def __call__(self, data: Buffer, start: Optional[int]) -> Tuple[int]: ...
pack_int2 = cast(PackInt, struct.Struct("!h").pack)
@classmethod
def fetch(
cls: Type[T], conn: "Connection[Any]", name: Union[str, "Identifier"]
- ) -> Optional[T]:
- ...
+ ) -> Optional[T]: ...
@overload
@classmethod
async def fetch(
cls: Type[T], conn: "AsyncConnection[Any]", name: Union[str, "Identifier"]
- ) -> Optional[T]:
- ...
+ ) -> Optional[T]: ...
@classmethod
def fetch(
yield t
@overload
- def __getitem__(self, key: Union[str, int]) -> TypeInfo:
- ...
+ def __getitem__(self, key: Union[str, int]) -> TypeInfo: ...
@overload
- def __getitem__(self, key: Tuple[Type[T], int]) -> T:
- ...
+ def __getitem__(self, key: Tuple[Type[T], int]) -> T: ...
def __getitem__(self, key: RegistryKey) -> TypeInfo:
"""
raise KeyError(f"couldn't find the type {key!r} in the types registry")
@overload
- def get(self, key: Union[str, int]) -> Optional[TypeInfo]:
- ...
+ def get(self, key: Union[str, int]) -> Optional[TypeInfo]: ...
@overload
- def get(self, key: Tuple[Type[T], int]) -> Optional[T]:
- ...
+ def get(self, key: Tuple[Type[T], int]) -> Optional[T]: ...
def get(self, key: RegistryKey) -> Optional[TypeInfo]:
"""
def __call__(
self, gen: PQGen[RV], fileno: int, timeout: Optional[float] = None
- ) -> RV:
- ...
+ ) -> RV: ...
# Adaptation types
oid: int
"""The oid to pass to the server, if known; 0 otherwise (class attribute)."""
- def __init__(self, cls: type, context: Optional[AdaptContext] = None):
- ...
+ def __init__(self, cls: type, context: Optional[AdaptContext] = None): ...
def dump(self, obj: Any) -> Buffer:
"""Convert the object `!obj` to PostgreSQL representation.
This is a class attribute.
"""
- def __init__(self, oid: int, context: Optional[AdaptContext] = None):
- ...
+ def __init__(self, oid: int, context: Optional[AdaptContext] = None): ...
def load(self, data: Buffer) -> Any:
"""
types: Optional[Tuple[int, ...]]
formats: Optional[List[pq.Format]]
- def __init__(self, context: Optional[AdaptContext] = None):
- ...
+ def __init__(self, context: Optional[AdaptContext] = None): ...
@classmethod
- def from_context(cls, context: Optional[AdaptContext]) -> "Transformer":
- ...
+ def from_context(cls, context: Optional[AdaptContext]) -> "Transformer": ...
@property
- def connection(self) -> Optional["BaseConnection[Any]"]:
- ...
+ def connection(self) -> Optional["BaseConnection[Any]"]: ...
@property
- def encoding(self) -> str:
- ...
+ def encoding(self) -> str: ...
@property
- def adapters(self) -> "AdaptersMap":
- ...
+ def adapters(self) -> "AdaptersMap": ...
@property
- def pgresult(self) -> Optional["PGresult"]:
- ...
+ def pgresult(self) -> Optional["PGresult"]: ...
def set_pgresult(
self,
*,
set_loaders: bool = True,
format: Optional[pq.Format] = None
- ) -> None:
- ...
+ ) -> None: ...
- def set_dumper_types(self, types: Sequence[int], format: pq.Format) -> None:
- ...
+ def set_dumper_types(self, types: Sequence[int], format: pq.Format) -> None: ...
- def set_loader_types(self, types: Sequence[int], format: pq.Format) -> None:
- ...
+ def set_loader_types(self, types: Sequence[int], format: pq.Format) -> None: ...
def dump_sequence(
self, params: Sequence[Any], formats: Sequence[PyFormat]
- ) -> Sequence[Optional[Buffer]]:
- ...
+ ) -> Sequence[Optional[Buffer]]: ...
- def as_literal(self, obj: Any) -> bytes:
- ...
+ def as_literal(self, obj: Any) -> bytes: ...
- def get_dumper(self, obj: Any, format: PyFormat) -> Dumper:
- ...
+ def get_dumper(self, obj: Any, format: PyFormat) -> Dumper: ...
- def load_rows(self, row0: int, row1: int, make_row: "RowMaker[Row]") -> List["Row"]:
- ...
+ def load_rows(
+ self, row0: int, row1: int, make_row: "RowMaker[Row]"
+ ) -> List["Row"]: ...
- def load_row(self, row: int, make_row: "RowMaker[Row]") -> Optional["Row"]:
- ...
+ def load_row(self, row: int, make_row: "RowMaker[Row]") -> Optional["Row"]: ...
- def load_sequence(self, record: Sequence[Optional[Buffer]]) -> Tuple[Any, ...]:
- ...
+ def load_sequence(self, record: Sequence[Optional[Buffer]]) -> Tuple[Any, ...]: ...
- def get_loader(self, oid: int, format: pq.Format) -> Loader:
- ...
+ def get_loader(self, oid: int, format: pq.Format) -> Loader: ...
)
@abstractmethod
- def dump(self, obj: Any) -> Buffer:
- ...
+ def dump(self, obj: Any) -> Buffer: ...
def quote(self, obj: Any) -> Buffer:
"""
cursor_factory: Optional[Type[Cursor[Any]]] = None,
context: Optional[AdaptContext] = None,
**kwargs: Union[None, int, str],
- ) -> "Connection[TupleRow]":
- ...
+ ) -> "Connection[TupleRow]": ...
@classmethod # type: ignore[misc] # https://github.com/python/mypy/issues/11004
def connect(
self.pgconn.finish()
@overload
- def cursor(self, *, binary: bool = False) -> Cursor[Row]:
- ...
+ def cursor(self, *, binary: bool = False) -> Cursor[Row]: ...
@overload
def cursor(
self, *, binary: bool = False, row_factory: RowFactory[CursorRow]
- ) -> Cursor[CursorRow]:
- ...
+ ) -> Cursor[CursorRow]: ...
@overload
def cursor(
binary: bool = False,
scrollable: Optional[bool] = None,
withhold: bool = False,
- ) -> ServerCursor[Row]:
- ...
+ ) -> ServerCursor[Row]: ...
@overload
def cursor(
row_factory: RowFactory[CursorRow],
scrollable: Optional[bool] = None,
withhold: bool = False,
- ) -> ServerCursor[CursorRow]:
- ...
+ ) -> ServerCursor[CursorRow]: ...
def cursor(
self,
cursor_factory: Optional[Type[AsyncCursor[Any]]] = None,
context: Optional[AdaptContext] = None,
**kwargs: Union[None, int, str],
- ) -> "AsyncConnection[TupleRow]":
- ...
+ ) -> "AsyncConnection[TupleRow]": ...
@classmethod # type: ignore[misc] # https://github.com/python/mypy/issues/11004
async def connect(
self.pgconn.finish()
@overload
- def cursor(self, *, binary: bool = False) -> AsyncCursor[Row]:
- ...
+ def cursor(self, *, binary: bool = False) -> AsyncCursor[Row]: ...
@overload
def cursor(
self, *, binary: bool = False, row_factory: AsyncRowFactory[CursorRow]
- ) -> AsyncCursor[CursorRow]:
- ...
+ ) -> AsyncCursor[CursorRow]: ...
@overload
def cursor(
binary: bool = False,
scrollable: Optional[bool] = None,
withhold: bool = False,
- ) -> AsyncServerCursor[Row]:
- ...
+ ) -> AsyncServerCursor[Row]: ...
@overload
def cursor(
row_factory: AsyncRowFactory[CursorRow],
scrollable: Optional[bool] = None,
withhold: bool = False,
- ) -> AsyncServerCursor[CursorRow]:
- ...
+ ) -> AsyncServerCursor[CursorRow]: ...
def cursor(
self,
"""
@abstractmethod
- async def write(self, data: Buffer) -> None:
- ...
+ async def write(self, data: Buffer) -> None: ...
async def finish(self, exc: Optional[BaseException] = None) -> None:
pass
self._row_mode = False # true if the user is using write_row()
@abstractmethod
- def parse_row(self, data: Buffer) -> Optional[Tuple[Any, ...]]:
- ...
+ def parse_row(self, data: Buffer) -> Optional[Tuple[Any, ...]]: ...
@abstractmethod
- def write(self, buffer: Union[Buffer, str]) -> Buffer:
- ...
+ def write(self, buffer: Union[Buffer, str]) -> Buffer: ...
@abstractmethod
- def write_row(self, row: Sequence[Any]) -> Buffer:
- ...
+ def write_row(self, row: Sequence[Any]) -> Buffer: ...
@abstractmethod
- def end(self) -> Buffer:
- ...
+ def end(self) -> Buffer: ...
class TextFormatter(Formatter):
cursor_factory: "Optional[Type[Cursor[Row]]]" = None,
context: Optional[AdaptContext] = None,
**kwargs: Union[None, int, str],
- ) -> "CrdbConnection[Row]":
- ...
+ ) -> "CrdbConnection[Row]": ...
@overload
@classmethod
cursor_factory: "Optional[Type[Cursor[Any]]]" = None,
context: Optional[AdaptContext] = None,
**kwargs: Union[None, int, str],
- ) -> "CrdbConnection[TupleRow]":
- ...
+ ) -> "CrdbConnection[TupleRow]": ...
@classmethod
def connect(cls, conninfo: str = "", **kwargs: Any) -> Self:
cursor_factory: "Optional[Type[AsyncCursor[Row]]]" = None,
context: Optional[AdaptContext] = None,
**kwargs: Union[None, int, str],
- ) -> "AsyncCrdbConnection[Row]":
- ...
+ ) -> "AsyncCrdbConnection[Row]": ...
@overload
@classmethod
cursor_factory: "Optional[Type[AsyncCursor[Any]]]" = None,
context: Optional[AdaptContext] = None,
**kwargs: Union[None, int, str],
- ) -> "AsyncCrdbConnection[TupleRow]":
- ...
+ ) -> "AsyncCrdbConnection[TupleRow]": ...
@classmethod
async def connect(cls, conninfo: str = "", **kwargs: Any) -> Self:
__slots__ = ()
@overload
- def __init__(self, connection: "Connection[Row]"):
- ...
+ def __init__(self, connection: "Connection[Row]"): ...
@overload
- def __init__(self, connection: "Connection[Any]", *, row_factory: RowFactory[Row]):
- ...
+ def __init__(
+ self, connection: "Connection[Any]", *, row_factory: RowFactory[Row]
+ ): ...
def __init__(
self,
__slots__ = ()
@overload
- def __init__(self, connection: "AsyncConnection[Row]"):
- ...
+ def __init__(self, connection: "AsyncConnection[Row]"): ...
@overload
def __init__(
self, connection: "AsyncConnection[Any]", *, row_factory: AsyncRowFactory[Row]
- ):
- ...
+ ): ...
def __init__(
self,
notify_handler: Optional[Callable[["PGnotify"], None]]
@classmethod
- def connect(cls, conninfo: bytes) -> "PGconn":
- ...
+ def connect(cls, conninfo: bytes) -> "PGconn": ...
@classmethod
- def connect_start(cls, conninfo: bytes) -> "PGconn":
- ...
+ def connect_start(cls, conninfo: bytes) -> "PGconn": ...
- def connect_poll(self) -> int:
- ...
+ def connect_poll(self) -> int: ...
- def finish(self) -> None:
- ...
+ def finish(self) -> None: ...
@property
- def info(self) -> List["ConninfoOption"]:
- ...
+ def info(self) -> List["ConninfoOption"]: ...
- def reset(self) -> None:
- ...
+ def reset(self) -> None: ...
- def reset_start(self) -> None:
- ...
+ def reset_start(self) -> None: ...
- def reset_poll(self) -> int:
- ...
+ def reset_poll(self) -> int: ...
@classmethod
- def ping(self, conninfo: bytes) -> int:
- ...
+ def ping(self, conninfo: bytes) -> int: ...
@property
- def db(self) -> bytes:
- ...
+ def db(self) -> bytes: ...
@property
- def user(self) -> bytes:
- ...
+ def user(self) -> bytes: ...
@property
- def password(self) -> bytes:
- ...
+ def password(self) -> bytes: ...
@property
- def host(self) -> bytes:
- ...
+ def host(self) -> bytes: ...
@property
- def hostaddr(self) -> bytes:
- ...
+ def hostaddr(self) -> bytes: ...
@property
- def port(self) -> bytes:
- ...
+ def port(self) -> bytes: ...
@property
- def tty(self) -> bytes:
- ...
+ def tty(self) -> bytes: ...
@property
- def options(self) -> bytes:
- ...
+ def options(self) -> bytes: ...
@property
- def status(self) -> int:
- ...
+ def status(self) -> int: ...
@property
- def transaction_status(self) -> int:
- ...
+ def transaction_status(self) -> int: ...
- def parameter_status(self, name: bytes) -> Optional[bytes]:
- ...
+ def parameter_status(self, name: bytes) -> Optional[bytes]: ...
@property
- def error_message(self) -> bytes:
- ...
+ def error_message(self) -> bytes: ...
@property
- def server_version(self) -> int:
- ...
+ def server_version(self) -> int: ...
@property
- def socket(self) -> int:
- ...
+ def socket(self) -> int: ...
@property
- def backend_pid(self) -> int:
- ...
+ def backend_pid(self) -> int: ...
@property
- def needs_password(self) -> bool:
- ...
+ def needs_password(self) -> bool: ...
@property
- def used_password(self) -> bool:
- ...
+ def used_password(self) -> bool: ...
@property
- def ssl_in_use(self) -> bool:
- ...
+ def ssl_in_use(self) -> bool: ...
- def exec_(self, command: bytes) -> "PGresult":
- ...
+ def exec_(self, command: bytes) -> "PGresult": ...
- def send_query(self, command: bytes) -> None:
- ...
+ def send_query(self, command: bytes) -> None: ...
def exec_params(
self,
param_types: Optional[Sequence[int]] = None,
param_formats: Optional[Sequence[int]] = None,
result_format: int = Format.TEXT,
- ) -> "PGresult":
- ...
+ ) -> "PGresult": ...
def send_query_params(
self,
param_types: Optional[Sequence[int]] = None,
param_formats: Optional[Sequence[int]] = None,
result_format: int = Format.TEXT,
- ) -> None:
- ...
+ ) -> None: ...
def send_prepare(
self,
name: bytes,
command: bytes,
param_types: Optional[Sequence[int]] = None,
- ) -> None:
- ...
+ ) -> None: ...
def send_query_prepared(
self,
param_values: Optional[Sequence[Optional[Buffer]]],
param_formats: Optional[Sequence[int]] = None,
result_format: int = Format.TEXT,
- ) -> None:
- ...
+ ) -> None: ...
def prepare(
self,
name: bytes,
command: bytes,
param_types: Optional[Sequence[int]] = None,
- ) -> "PGresult":
- ...
+ ) -> "PGresult": ...
def exec_prepared(
self,
param_values: Optional[Sequence[Buffer]],
param_formats: Optional[Sequence[int]] = None,
result_format: int = 0,
- ) -> "PGresult":
- ...
+ ) -> "PGresult": ...
- def describe_prepared(self, name: bytes) -> "PGresult":
- ...
+ def describe_prepared(self, name: bytes) -> "PGresult": ...
- def send_describe_prepared(self, name: bytes) -> None:
- ...
+ def send_describe_prepared(self, name: bytes) -> None: ...
- def describe_portal(self, name: bytes) -> "PGresult":
- ...
+ def describe_portal(self, name: bytes) -> "PGresult": ...
- def send_describe_portal(self, name: bytes) -> None:
- ...
+ def send_describe_portal(self, name: bytes) -> None: ...
- def get_result(self) -> Optional["PGresult"]:
- ...
+ def get_result(self) -> Optional["PGresult"]: ...
- def consume_input(self) -> None:
- ...
+ def consume_input(self) -> None: ...
- def is_busy(self) -> int:
- ...
+ def is_busy(self) -> int: ...
@property
- def nonblocking(self) -> int:
- ...
+ def nonblocking(self) -> int: ...
@nonblocking.setter
- def nonblocking(self, arg: int) -> None:
- ...
+ def nonblocking(self, arg: int) -> None: ...
- def flush(self) -> int:
- ...
+ def flush(self) -> int: ...
- def set_single_row_mode(self) -> None:
- ...
+ def set_single_row_mode(self) -> None: ...
- def get_cancel(self) -> "PGcancel":
- ...
+ def get_cancel(self) -> "PGcancel": ...
- def notifies(self) -> Optional["PGnotify"]:
- ...
+ def notifies(self) -> Optional["PGnotify"]: ...
- def put_copy_data(self, buffer: Buffer) -> int:
- ...
+ def put_copy_data(self, buffer: Buffer) -> int: ...
- def put_copy_end(self, error: Optional[bytes] = None) -> int:
- ...
+ def put_copy_end(self, error: Optional[bytes] = None) -> int: ...
- def get_copy_data(self, async_: int) -> Tuple[int, memoryview]:
- ...
+ def get_copy_data(self, async_: int) -> Tuple[int, memoryview]: ...
- def trace(self, fileno: int) -> None:
- ...
+ def trace(self, fileno: int) -> None: ...
- def set_trace_flags(self, flags: Trace) -> None:
- ...
+ def set_trace_flags(self, flags: Trace) -> None: ...
- def untrace(self) -> None:
- ...
+ def untrace(self) -> None: ...
def encrypt_password(
self, passwd: bytes, user: bytes, algorithm: Optional[bytes] = None
- ) -> bytes:
- ...
+ ) -> bytes: ...
- def make_empty_result(self, exec_status: int) -> "PGresult":
- ...
+ def make_empty_result(self, exec_status: int) -> "PGresult": ...
@property
- def pipeline_status(self) -> int:
- ...
+ def pipeline_status(self) -> int: ...
- def enter_pipeline_mode(self) -> None:
- ...
+ def enter_pipeline_mode(self) -> None: ...
- def exit_pipeline_mode(self) -> None:
- ...
+ def exit_pipeline_mode(self) -> None: ...
- def pipeline_sync(self) -> None:
- ...
+ def pipeline_sync(self) -> None: ...
- def send_flush_request(self) -> None:
- ...
+ def send_flush_request(self) -> None: ...
class PGresult(Protocol):
- def clear(self) -> None:
- ...
+ def clear(self) -> None: ...
@property
- def status(self) -> int:
- ...
+ def status(self) -> int: ...
@property
- def error_message(self) -> bytes:
- ...
+ def error_message(self) -> bytes: ...
- def error_field(self, fieldcode: int) -> Optional[bytes]:
- ...
+ def error_field(self, fieldcode: int) -> Optional[bytes]: ...
@property
- def ntuples(self) -> int:
- ...
+ def ntuples(self) -> int: ...
@property
- def nfields(self) -> int:
- ...
+ def nfields(self) -> int: ...
- def fname(self, column_number: int) -> Optional[bytes]:
- ...
+ def fname(self, column_number: int) -> Optional[bytes]: ...
- def ftable(self, column_number: int) -> int:
- ...
+ def ftable(self, column_number: int) -> int: ...
- def ftablecol(self, column_number: int) -> int:
- ...
+ def ftablecol(self, column_number: int) -> int: ...
- def fformat(self, column_number: int) -> int:
- ...
+ def fformat(self, column_number: int) -> int: ...
- def ftype(self, column_number: int) -> int:
- ...
+ def ftype(self, column_number: int) -> int: ...
- def fmod(self, column_number: int) -> int:
- ...
+ def fmod(self, column_number: int) -> int: ...
- def fsize(self, column_number: int) -> int:
- ...
+ def fsize(self, column_number: int) -> int: ...
@property
- def binary_tuples(self) -> int:
- ...
+ def binary_tuples(self) -> int: ...
- def get_value(self, row_number: int, column_number: int) -> Optional[bytes]:
- ...
+ def get_value(self, row_number: int, column_number: int) -> Optional[bytes]: ...
@property
- def nparams(self) -> int:
- ...
+ def nparams(self) -> int: ...
- def param_type(self, param_number: int) -> int:
- ...
+ def param_type(self, param_number: int) -> int: ...
@property
- def command_status(self) -> Optional[bytes]:
- ...
+ def command_status(self) -> Optional[bytes]: ...
@property
- def command_tuples(self) -> Optional[int]:
- ...
+ def command_tuples(self) -> Optional[int]: ...
@property
- def oid_value(self) -> int:
- ...
+ def oid_value(self) -> int: ...
- def set_attributes(self, descriptions: List["PGresAttDesc"]) -> None:
- ...
+ def set_attributes(self, descriptions: List["PGresAttDesc"]) -> None: ...
class PGcancel(Protocol):
- def free(self) -> None:
- ...
+ def free(self) -> None: ...
- def cancel(self) -> None:
- ...
+ def cancel(self) -> None: ...
class Conninfo(Protocol):
@classmethod
- def get_defaults(cls) -> List["ConninfoOption"]:
- ...
+ def get_defaults(cls) -> List["ConninfoOption"]: ...
@classmethod
- def parse(cls, conninfo: bytes) -> List["ConninfoOption"]:
- ...
+ def parse(cls, conninfo: bytes) -> List["ConninfoOption"]: ...
@classmethod
- def _options_from_array(cls, opts: Sequence[Any]) -> List["ConninfoOption"]:
- ...
+ def _options_from_array(cls, opts: Sequence[Any]) -> List["ConninfoOption"]: ...
class Escaping(Protocol):
- def __init__(self, conn: Optional[PGconn] = None):
- ...
+ def __init__(self, conn: Optional[PGconn] = None): ...
- def escape_literal(self, data: Buffer) -> bytes:
- ...
+ def escape_literal(self, data: Buffer) -> bytes: ...
- def escape_identifier(self, data: Buffer) -> bytes:
- ...
+ def escape_identifier(self, data: Buffer) -> bytes: ...
- def escape_string(self, data: Buffer) -> bytes:
- ...
+ def escape_string(self, data: Buffer) -> bytes: ...
- def escape_bytea(self, data: Buffer) -> bytes:
- ...
+ def escape_bytea(self, data: Buffer) -> bytes: ...
- def unescape_bytea(self, data: Buffer) -> bytes:
- ...
+ def unescape_bytea(self, data: Buffer) -> bytes: ...
Typically, `!RowMaker` functions are returned by `RowFactory`.
"""
- def __call__(self, __values: Sequence[Any]) -> Row:
- ...
+ def __call__(self, __values: Sequence[Any]) -> Row: ...
class RowFactory(Protocol[Row]):
use the values to create a dictionary for each record.
"""
- def __call__(self, __cursor: "Cursor[Any]") -> RowMaker[Row]:
- ...
+ def __call__(self, __cursor: "Cursor[Any]") -> RowMaker[Row]: ...
class AsyncRowFactory(Protocol[Row]):
Like `RowFactory`, taking an async cursor as argument.
"""
- def __call__(self, __cursor: "AsyncCursor[Any]") -> RowMaker[Row]:
- ...
+ def __call__(self, __cursor: "AsyncCursor[Any]") -> RowMaker[Row]: ...
class BaseRowFactory(Protocol[Row]):
Like `RowFactory`, taking either type of cursor as argument.
"""
- def __call__(self, __cursor: "BaseCursor[Any, Any]") -> RowMaker[Row]:
- ...
+ def __call__(self, __cursor: "BaseCursor[Any, Any]") -> RowMaker[Row]: ...
TupleRow: TypeAlias = Tuple[Any, ...]
*,
scrollable: Optional[bool] = None,
withhold: bool = False,
- ):
- ...
+ ): ...
@overload
def __init__(
row_factory: RowFactory[Row],
scrollable: Optional[bool] = None,
withhold: bool = False,
- ):
- ...
+ ): ...
def __init__(
self,
*,
scrollable: Optional[bool] = None,
withhold: bool = False,
- ):
- ...
+ ): ...
@overload
def __init__(
row_factory: AsyncRowFactory[Row],
scrollable: Optional[bool] = None,
withhold: bool = False,
- ):
- ...
+ ): ...
def __init__(
self,
"""
Adapters for the enum type.
"""
+
from enum import Enum
from typing import Dict, Generic, Optional, Mapping, Sequence
from typing import Tuple, Type, TypeVar, Union, cast
return f"{{{', '.join(map(str, self._ranges))}}}"
@overload
- def __getitem__(self, index: int) -> Range[T]:
- ...
+ def __getitem__(self, index: int) -> Range[T]: ...
@overload
- def __getitem__(self, index: slice) -> "Multirange[T]":
- ...
+ def __getitem__(self, index: slice) -> "Multirange[T]": ...
def __getitem__(self, index: Union[int, slice]) -> "Union[Range[T],Multirange[T]]":
if isinstance(index, int):
return len(self._ranges)
@overload
- def __setitem__(self, index: int, value: Range[T]) -> None:
- ...
+ def __setitem__(self, index: int, value: Range[T]) -> None: ...
@overload
- def __setitem__(self, index: slice, value: Iterable[Range[T]]) -> None:
- ...
+ def __setitem__(self, index: slice, value: Iterable[Range[T]]) -> None: ...
def __setitem__(
self,
],
# Requirements needed for development
"dev": [
- "black >= 23.1.0",
+ "black >= 24.1.0",
"codespell >= 2.2",
"dnspython >= 2.1",
"flake8 >= 4.0",
[flake8]
max-line-length = 88
-ignore = W503, E203
+ignore = W503, E203, E704
[flake8]
max-line-length = 88
-ignore = W503, E203
+ignore = W503, E203, E704
pool.run_task(self)
@abstractmethod
- def _run(self, pool: "ConnectionPool") -> None:
- ...
+ def _run(self, pool: "ConnectionPool") -> None: ...
class StopWorker(MaintenanceTask):
pool.run_task(self)
@abstractmethod
- async def _run(self, pool: "AsyncConnectionPool") -> None:
- ...
+ async def _run(self, pool: "AsyncConnectionPool") -> None: ...
class StopWorker(MaintenanceTask):
pytest-randomly == 3.5.0
# From the 'dev' extra
-black == 23.1.0
+black == 24.1.0
dnspython == 2.1.0
flake8 == 4.0.0
types-setuptools == 57.4.0
"""
A quick and rough performance comparison of text vs. binary Decimal adaptation
"""
+
from random import randrange
from decimal import Decimal
import psycopg
handled by execute() calls when pgconn socket is read-ready, which
happens when the output buffer is full.
"""
+
import argparse
import asyncio
import logging
async with aconn.cursor() as cur:
assert isinstance(cur, MyCursor)
- async with (await aconn.execute("select 1")) as cur:
+ async with await aconn.execute("select 1") as cur:
assert isinstance(cur, MyCursor)
@pytest.mark.parametrize(
"fmt_in",
[
- f
- if f != PyFormat.BINARY
- else pytest.param(f, marks=pytest.mark.crdb_skip("binary decimal"))
+ (
+ f
+ if f != PyFormat.BINARY
+ else pytest.param(f, marks=pytest.mark.crdb_skip("binary decimal"))
+ )
for f in PyFormat
],
)