from __future__ import annotations
-from typing import Any, Type, cast, TYPE_CHECKING
+from typing import Any, cast, TYPE_CHECKING
from . import pq
from . import errors as e
types: TypesRegistry
- _dumpers: dict[PyFormat, dict[type | str, Type[Dumper]]]
- _dumpers_by_oid: list[dict[int, Type[Dumper]]]
- _loaders: list[dict[int, Type[Loader]]]
+ _dumpers: dict[PyFormat, dict[type | str, type[Dumper]]]
+ _dumpers_by_oid: list[dict[int, type[Dumper]]]
+ _loaders: list[dict[int, type[Loader]]]
# Record if a dumper or loader has an optimised version.
_optimised: dict[type, type] = {}
def connection(self) -> "BaseConnection[Any]" | None:
return None
- def register_dumper(self, cls: type | str | None, dumper: Type[Dumper]) -> None:
+ def register_dumper(self, cls: type | str | None, dumper: type[Dumper]) -> None:
"""
Configure the context to use `!dumper` to convert objects of type `!cls`.
self._dumpers_by_oid[dumper.format][dumper.oid] = dumper
- def register_loader(self, oid: int | str, loader: Type["Loader"]) -> None:
+ def register_loader(self, oid: int | str, loader: type[Loader]) -> None:
"""
Configure the context to use `!loader` to convert data of oid `!oid`.
self._loaders[fmt][oid] = loader
- def get_dumper(self, cls: type, format: PyFormat) -> Type["Dumper"]:
+ def get_dumper(self, cls: type, format: PyFormat) -> type[Dumper]:
"""
Return the dumper class for the given type and format.
f" (format: {format.name})"
)
- def get_dumper_by_oid(self, oid: int, format: pq.Format) -> Type["Dumper"]:
+ def get_dumper_by_oid(self, oid: int, format: pq.Format) -> type[Dumper]:
"""
Return the dumper class for the given oid and format.
)
raise e.ProgrammingError(msg)
- def get_loader(self, oid: int, format: pq.Format) -> Type["Loader"] | None:
+ def get_loader(self, oid: int, format: pq.Format) -> type[Loader] | None:
"""
Return the loader class for the given oid and format.
return self._loaders[format].get(oid)
@classmethod
- def _get_optimised(self, cls: Type[RV]) -> Type[RV]:
+ def _get_optimised(self, cls: type[RV]) -> type[RV]:
"""Return the optimised version of a Dumper or Loader class.
Return the input class itself if there is no optimised version.
from psycopg import types
if cls.__module__.startswith(types.__name__):
- new = cast(Type[RV], getattr(_psycopg, cls.__name__, None))
+ new = cast(type[RV], getattr(_psycopg, cls.__name__, None))
if new:
self._optimised[cls] = new
return new
from abc import ABC, abstractmethod
from types import TracebackType
-from typing import Any, Iterator, Type, Tuple, Sequence, TYPE_CHECKING
+from typing import Any, Iterator, Tuple, Sequence, TYPE_CHECKING
from . import pq
from . import errors as e
def __exit__(
self,
- exc_type: Type[BaseException] | None,
+ exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
from abc import ABC, abstractmethod
from types import TracebackType
-from typing import Any, AsyncIterator, Type, Tuple, Sequence, TYPE_CHECKING
+from typing import Any, AsyncIterator, Tuple, Sequence, TYPE_CHECKING
from . import pq
from . import errors as e
async def __aexit__(
self,
- exc_type: Type[BaseException] | None,
+ exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
from __future__ import annotations
from functools import partial
-from typing import Any, Generic, Iterable, NoReturn, Sequence, Tuple, Type
+from typing import Any, Generic, Iterable, NoReturn, Sequence, Tuple
from typing import TYPE_CHECKING
from . import pq
_tx: "Transformer"
_make_row: RowMaker[Row]
_pgconn: "PGconn"
- _query_cls: Type[PostgresQuery] = PostgresQuery
+ _query_cls: type[PostgresQuery] = PostgresQuery
def __init__(self, connection: ConnectionType):
self._conn = connection
import logging
from types import TracebackType
-from typing import Any, Tuple, Type, TYPE_CHECKING
+from typing import Any, Tuple, TYPE_CHECKING
from . import pq
from . import errors as e
def __exit__(
self,
- exc_type: Type[BaseException] | None,
+ exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
async def __aexit__(
self,
- exc_type: Type[BaseException] | None,
+ exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
# Copyright (C) 2023 The Psycopg Team
-from typing import Type
+from __future__ import annotations
from . import abc
from ._cmodule import _psycopg
-Transformer: Type[abc.Transformer]
+Transformer: type[abc.Transformer]
if _psycopg:
Transformer = _psycopg.Transformer
import logging
from time import monotonic
from types import TracebackType
-from typing import Any, Generator, Iterator
-from typing import Type, cast, overload, TYPE_CHECKING
+from typing import Any, Generator, Iterator, cast, overload, TYPE_CHECKING
from contextlib import contextmanager
from . import pq
__module__ = "psycopg"
- cursor_factory: Type[Cursor[Row]]
- server_cursor_factory: Type[ServerCursor[Row]]
+ cursor_factory: type[Cursor[Row]]
+ server_cursor_factory: type[ServerCursor[Row]]
row_factory: RowFactory[Row]
_pipeline: Pipeline | None
prepare_threshold: int | None = 5,
context: AdaptContext | None = None,
row_factory: RowFactory[Row] | None = None,
- cursor_factory: Type[Cursor[Row]] | None = None,
+ cursor_factory: type[Cursor[Row]] | None = None,
**kwargs: ConnParam,
) -> Self:
"""
def __exit__(
self,
- exc_type: Type[BaseException] | None,
+ exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
import logging
from time import monotonic
from types import TracebackType
-from typing import Any, AsyncGenerator, AsyncIterator
-from typing import Type, cast, overload, TYPE_CHECKING
+from typing import Any, AsyncGenerator, AsyncIterator, cast, overload, TYPE_CHECKING
from contextlib import asynccontextmanager
from . import pq
__module__ = "psycopg"
- cursor_factory: Type[AsyncCursor[Row]]
- server_cursor_factory: Type[AsyncServerCursor[Row]]
+ cursor_factory: type[AsyncCursor[Row]]
+ server_cursor_factory: type[AsyncServerCursor[Row]]
row_factory: AsyncRowFactory[Row]
_pipeline: AsyncPipeline | None
prepare_threshold: int | None = 5,
context: AdaptContext | None = None,
row_factory: AsyncRowFactory[Row] | None = None,
- cursor_factory: Type[AsyncCursor[Row]] | None = None,
+ cursor_factory: type[AsyncCursor[Row]] | None = None,
**kwargs: ConnParam,
) -> Self:
"""
async def __aexit__(
self,
- exc_type: Type[BaseException] | None,
+ exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
from __future__ import annotations
from types import TracebackType
-from typing import Any, Iterator, Iterable, Type, TYPE_CHECKING, overload
+from typing import Any, Iterator, Iterable, TYPE_CHECKING, overload
from contextlib import contextmanager
from . import pq
def __exit__(
self,
- exc_type: Type[BaseException] | None,
+ exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
from __future__ import annotations
from types import TracebackType
-from typing import Any, AsyncIterator, Iterable, Type, TYPE_CHECKING, overload
+from typing import Any, AsyncIterator, Iterable, TYPE_CHECKING, overload
from contextlib import asynccontextmanager
from . import pq
async def __aexit__(
self,
- exc_type: Type[BaseException] | None,
+ exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
from __future__ import annotations
from dataclasses import dataclass, field, fields
-from typing import Any, Callable, NoReturn, Sequence, Tuple, Type, TYPE_CHECKING
+from typing import Any, Callable, NoReturn, Sequence, Tuple, TYPE_CHECKING
from asyncio import CancelledError
from .pq.abc import PGconn, PGresult
ErrorInfo: TypeAlias = None | PGresult | dict[int, bytes | None]
-_sqlcodes: dict[str, "Type[Error]"] = {}
+_sqlcodes: dict[str, type[Error]] = {}
@dataclass
return info
-def lookup(sqlstate: str) -> Type[Error]:
+def lookup(sqlstate: str) -> type[Error]:
"""Lookup an error code or `constant name`__ and return its exception class.
Raise `!KeyError` if the code is not found.
return hasattr(info, "error_field")
-def _class_for_state(sqlstate: str) -> Type[Error]:
+def _class_for_state(sqlstate: str) -> type[Error]:
try:
return lookup(sqlstate)
except KeyError:
return get_base_exception(sqlstate)
-def get_base_exception(sqlstate: str) -> Type[Error]:
+def get_base_exception(sqlstate: str) -> type[Error]:
return (
_base_exc_map.get(sqlstate[:2])
or _base_exc_map.get(sqlstate[:1])
import os
import logging
-from typing import Callable, Type
+from typing import Callable
from . import abc
from .misc import ConninfoOption, PGnotify, PGresAttDesc
"""
version: Callable[[], int]
-PGconn: Type[abc.PGconn]
-PGresult: Type[abc.PGresult]
-Conninfo: Type[abc.Conninfo]
-Escaping: Type[abc.Escaping]
-PGcancel: Type[abc.PGcancel]
-PGcancelConn: Type[abc.PGcancelConn]
+PGconn: type[abc.PGconn]
+PGresult: type[abc.PGresult]
+Conninfo: type[abc.Conninfo]
+Escaping: type[abc.Escaping]
+PGcancel: type[abc.PGcancel]
+PGcancelConn: type[abc.PGcancelConn]
def import_from_libpq() -> None:
import functools
from typing import Any, Callable, NamedTuple, NoReturn
-from typing import TYPE_CHECKING, Protocol, Sequence, Tuple, Type
+from typing import TYPE_CHECKING, Protocol, Sequence, Tuple
from collections import namedtuple
from . import pq
@functools.lru_cache(512)
-def _make_nt(enc: str, *names: bytes) -> Type[NamedTuple]:
+def _make_nt(enc: str, *names: bytes) -> type[NamedTuple]:
snames = tuple(_as_python_identifier(n.decode(enc)) for n in names)
return namedtuple("Row", snames) # type: ignore[return-value]
-def class_row(cls: Type[T]) -> BaseRowFactory[T]:
+def class_row(cls: type[T]) -> BaseRowFactory[T]:
r"""Generate a row factory to represent rows as instances of the class `!cls`.
The class must support every output column name as a keyword parameter.
import logging
from types import TracebackType
-from typing import Generic, Iterator, Type, TYPE_CHECKING
+from typing import Generic, Iterator, TYPE_CHECKING
from . import pq
from . import sql
def _exit_gen(
self,
- exc_type: Type[BaseException] | None,
+ exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> PQGen[bool]:
def __exit__(
self,
- exc_type: Type[BaseException] | None,
+ exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
async def __aexit__(
self,
- exc_type: Type[BaseException] | None,
+ exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
import re
import struct
from math import prod
-from typing import Any, cast, Callable, Pattern, Set, Tuple, Type
+from typing import Any, cast, Callable, Pattern, Set, Tuple
from .. import pq
from .. import errors as e
@cache
-def _make_loader(name: str, oid: int, delimiter: str) -> Type[Loader]:
+def _make_loader(name: str, oid: int, delimiter: str) -> type[Loader]:
# Note: caching this function is really needed because, if the C extension
# is available, the resulting type cannot be GC'd, so calling
# register_array() in a loop results in a leak. See #647.
@cache
def _make_dumper(
name: str, oid: int, array_oid: int, delimiter: str
-) -> Type[BaseListDumper]:
+) -> type[BaseListDumper]:
attribs = {"oid": array_oid, "element_oid": oid, "delimiter": delimiter.encode()}
return type(f"{name.title()}ListDumper", (ListDumper,), attribs)
@cache
-def _make_binary_dumper(name: str, oid: int, array_oid: int) -> Type[BaseListDumper]:
+def _make_binary_dumper(name: str, oid: int, array_oid: int) -> type[BaseListDumper]:
attribs = {"oid": array_oid, "element_oid": oid}
return type(f"{name.title()}ListBinaryDumper", (ListBinaryDumper,), attribs)
import struct
from collections import namedtuple
from typing import Any, Callable, cast, Iterator
-from typing import NamedTuple, Sequence, Tuple, Type, TYPE_CHECKING
+from typing import NamedTuple, Sequence, Tuple, TYPE_CHECKING
from .. import pq
from .. import abc
adapters = context.adapters if context else postgres.adapters
# generate and register a customized text loader
- loader: Type[BaseCompositeLoader]
+ loader: type[BaseCompositeLoader]
loader = _make_loader(info.name, tuple(info.field_types), factory)
adapters.register_loader(info.oid, loader)
# If the factory is a type, create and register dumpers for it
if isinstance(factory, type):
- dumper: Type[Dumper]
+ dumper: type[Dumper]
dumper = _make_binary_dumper(info.name, info.oid, tuple(info.field_types))
adapters.register_dumper(factory, dumper)
adapters.register_loader("record", RecordBinaryLoader)
-def _nt_from_info(info: CompositeInfo) -> Type[NamedTuple]:
+def _nt_from_info(info: CompositeInfo) -> type[NamedTuple]:
name = _as_python_identifier(info.name)
fields = tuple(_as_python_identifier(n) for n in info.field_names)
return _make_nt(name, fields)
@cache
-def _make_nt(name: str, fields: Tuple[str, ...]) -> Type[NamedTuple]:
+def _make_nt(name: str, fields: Tuple[str, ...]) -> type[NamedTuple]:
return namedtuple(name, fields) # type: ignore[return-value]
@cache
def _make_loader(
name: str, types: Tuple[int, ...], factory: Callable[..., Any]
-) -> Type[BaseCompositeLoader]:
+) -> type[BaseCompositeLoader]:
return type(
f"{name.title()}Loader",
(CompositeLoader,),
@cache
def _make_binary_loader(
name: str, factory: Callable[..., Any]
-) -> Type[BaseCompositeLoader]:
+) -> type[BaseCompositeLoader]:
return type(
f"{name.title()}BinaryLoader", (CompositeBinaryLoader,), {"factory": factory}
)
@cache
-def _make_dumper(name: str, oid: int) -> Type[TupleDumper]:
+def _make_dumper(name: str, oid: int) -> type[TupleDumper]:
return type(f"{name.title()}Dumper", (TupleDumper,), {"oid": oid})
@cache
def _make_binary_dumper(
name: str, oid: int, field_types: Tuple[int, ...]
-) -> Type[TupleBinaryDumper]:
+) -> type[TupleBinaryDumper]:
return type(
f"{name.title()}BinaryDumper",
(TupleBinaryDumper,),
from __future__ import annotations
from enum import Enum
-from typing import Any, Generic, Mapping, Sequence, Tuple, Type, cast, TYPE_CHECKING
+from typing import Any, Generic, Mapping, Sequence, Tuple, cast, TYPE_CHECKING
from .. import sql
from .. import postgres
super().__init__(name, oid, array_oid)
self.labels = labels
# Will be set by register_enum()
- self.enum: Type[Enum] | None = None
+ self.enum: type[Enum] | None = None
@classmethod
def _get_info_query(cls, conn: "BaseConnection[Any]") -> Query:
Loader for a specific Enum class
"""
- enum: Type[E]
+ enum: type[E]
_load_map: EnumLoadMap[E]
def load(self, data: Buffer) -> E:
Dumper for a specific Enum class
"""
- enum: Type[E]
+ enum: type[E]
_dump_map: EnumDumpMap[E]
def dump(self, value: E) -> Buffer | None:
def register_enum(
info: EnumInfo,
context: AdaptContext | None = None,
- enum: Type[E] | None = None,
+ enum: type[E] | None = None,
*,
mapping: EnumMapping[E] = None,
) -> None:
raise TypeError("no info passed. Is the requested enum available?")
if enum is None:
- enum = cast(Type[E], _make_enum(info.name, tuple(info.labels)))
+ enum = cast(type[E], _make_enum(info.name, tuple(info.labels)))
info.enum = enum
adapters = context.adapters if context else postgres.adapters
@cache
def _make_loader(
- name: str, enum: Type[Enum], load_map: _HEnumLoadMap[E]
-) -> Type[_BaseEnumLoader[E]]:
+ name: str, enum: type[Enum], load_map: _HEnumLoadMap[E]
+) -> type[_BaseEnumLoader[E]]:
attribs = {"enum": enum, "_load_map": dict(load_map)}
return type(f"{name.title()}Loader", (_BaseEnumLoader,), attribs)
@cache
def _make_binary_loader(
- name: str, enum: Type[Enum], load_map: _HEnumLoadMap[E]
-) -> Type[_BaseEnumLoader[E]]:
+ name: str, enum: type[Enum], load_map: _HEnumLoadMap[E]
+) -> type[_BaseEnumLoader[E]]:
attribs = {"enum": enum, "_load_map": dict(load_map), "format": BINARY}
return type(f"{name.title()}BinaryLoader", (_BaseEnumLoader,), attribs)
@cache
def _make_dumper(
- enum: Type[Enum], oid: int, dump_map: _HEnumDumpMap[E]
-) -> Type[_BaseEnumDumper[E]]:
+ enum: type[Enum], oid: int, dump_map: _HEnumDumpMap[E]
+) -> type[_BaseEnumDumper[E]]:
attribs = {"enum": enum, "oid": oid, "_dump_map": dict(dump_map)}
return type(f"{enum.__name__}Dumper", (_BaseEnumDumper,), attribs)
@cache
def _make_binary_dumper(
- enum: Type[Enum], oid: int, dump_map: _HEnumDumpMap[E]
-) -> Type[_BaseEnumDumper[E]]:
+ enum: type[Enum], oid: int, dump_map: _HEnumDumpMap[E]
+) -> type[_BaseEnumDumper[E]]:
attribs = {"enum": enum, "oid": oid, "_dump_map": dict(dump_map), "format": BINARY}
return type(f"{enum.__name__}BinaryDumper", (_BaseEnumDumper,), attribs)
def _make_load_map(
- info: EnumInfo,
- enum: Type[E],
- mapping: EnumMapping[E],
- context: AdaptContext | None,
+ info: EnumInfo, enum: type[E], mapping: EnumMapping[E], context: AdaptContext | None
) -> _HEnumLoadMap[E]:
enc = conn_encoding(context.connection if context else None)
rv = []
def _make_dump_map(
- info: EnumInfo,
- enum: Type[E],
- mapping: EnumMapping[E],
- context: AdaptContext | None,
+ info: EnumInfo, enum: type[E], mapping: EnumMapping[E], context: AdaptContext | None
) -> _HEnumDumpMap[E]:
enc = conn_encoding(context.connection if context else None)
rv = []
from __future__ import annotations
import re
-from typing import Type
from .. import errors as e
from .. import postgres
@cache
-def _make_hstore_dumper(oid_in: int) -> Type[BaseHstoreDumper]:
+def _make_hstore_dumper(oid_in: int) -> type[BaseHstoreDumper]:
"""
Return an hstore dumper class configured using `oid_in`.
from __future__ import annotations
import json
-from typing import Any, Callable, Tuple, Type
+from typing import Any, Callable, Tuple
from .. import abc
from .. import _oids
@cache
-def _make_dumper(base: Type[abc.Dumper], dumps: JsonDumpsFunction) -> Type[abc.Dumper]:
+def _make_dumper(base: type[abc.Dumper], dumps: JsonDumpsFunction) -> type[abc.Dumper]:
name = base.__name__
if not name.startswith("Custom"):
name = f"Custom{name}"
@cache
-def _make_loader(base: Type[Loader], loads: JsonLoadsFunction) -> Type[Loader]:
+def _make_loader(base: type[Loader], loads: JsonLoadsFunction) -> type[Loader]:
name = base.__name__
if not name.startswith("Custom"):
name = f"Custom{name}"
def _get_current_dumper(
adapters: AdaptersMap, cls: type, format: PyFormat
-) -> Type[abc.Dumper]:
+) -> type[abc.Dumper]:
try:
return adapters.get_dumper(cls, format)
except e.ProgrammingError:
return _default_dumpers[cls, format]
-_default_dumpers: dict[Tuple[Type[_JsonWrapper], PyFormat], Type[Dumper]] = {
+_default_dumpers: dict[Tuple[type[_JsonWrapper], PyFormat], type[Dumper]] = {
(Json, PyFormat.BINARY): JsonBinaryDumper,
(Json, PyFormat.TEXT): JsonDumper,
(Jsonb, PyFormat.BINARY): JsonbBinaryDumper,
from __future__ import annotations
from decimal import Decimal
-from typing import Any, Generic, Iterable, MutableSequence
-from typing import Type, overload, TYPE_CHECKING
+from typing import Any, Generic, Iterable, MutableSequence, overload, TYPE_CHECKING
from datetime import date, datetime
from .. import sql
adapters = context.adapters if context else postgres.adapters
# generate and register a customized text loader
- loader: Type[BaseMultirangeLoader[Any]]
+ loader: type[BaseMultirangeLoader[Any]]
loader = _make_loader(info.name, info.subtype_oid)
adapters.register_loader(info.oid, loader)
@cache
-def _make_loader(name: str, oid: int) -> Type[MultirangeLoader[Any]]:
+def _make_loader(name: str, oid: int) -> type[MultirangeLoader[Any]]:
return type(f"{name.title()}Loader", (MultirangeLoader,), {"subtype_oid": oid})
@cache
-def _make_binary_loader(name: str, oid: int) -> Type[MultirangeBinaryLoader[Any]]:
+def _make_binary_loader(name: str, oid: int) -> type[MultirangeBinaryLoader[Any]]:
return type(
f"{name.title()}BinaryLoader", (MultirangeBinaryLoader,), {"subtype_oid": oid}
)
from __future__ import annotations
-from typing import Callable, Type, TYPE_CHECKING
+from typing import Callable, TYPE_CHECKING
from .. import _oids
from ..pq import Format
ip_address: Callable[[str], Address] = None # type: ignore[assignment]
ip_interface: Callable[[str], Interface] = None # type: ignore[assignment]
ip_network: Callable[[str], Network] = None # type: ignore[assignment]
-IPv4Address: "Type[ipaddress.IPv4Address]" = None # type: ignore[assignment]
-IPv6Address: "Type[ipaddress.IPv6Address]" = None # type: ignore[assignment]
-IPv4Interface: "Type[ipaddress.IPv4Interface]" = None # type: ignore[assignment]
-IPv6Interface: "Type[ipaddress.IPv6Interface]" = None # type: ignore[assignment]
-IPv4Network: "Type[ipaddress.IPv4Network]" = None # type: ignore[assignment]
-IPv6Network: "Type[ipaddress.IPv6Network]" = None # type: ignore[assignment]
+IPv4Address: "type[ipaddress.IPv4Address]" = None # type: ignore[assignment]
+IPv6Address: "type[ipaddress.IPv6Address]" = None # type: ignore[assignment]
+IPv4Interface: "type[ipaddress.IPv4Interface]" = None # type: ignore[assignment]
+IPv6Interface: "type[ipaddress.IPv6Interface]" = None # type: ignore[assignment]
+IPv4Network: "type[ipaddress.IPv4Network]" = None # type: ignore[assignment]
+IPv6Network: "type[ipaddress.IPv6Network]" = None # type: ignore[assignment]
PGSQL_AF_INET = 2
PGSQL_AF_INET6 = 3
from __future__ import annotations
import re
-from typing import Any, Generic, Type, Tuple, cast, TYPE_CHECKING
+from typing import Any, Generic, Tuple, cast, TYPE_CHECKING
from decimal import Decimal
from datetime import date, datetime
adapters = context.adapters if context else postgres.adapters
# generate and register a customized text loader
- loader: Type[BaseRangeLoader[Any]]
+ loader: type[BaseRangeLoader[Any]]
loader = _make_loader(info.name, info.subtype_oid)
adapters.register_loader(info.oid, loader)
@cache
-def _make_loader(name: str, oid: int) -> Type[RangeLoader[Any]]:
+def _make_loader(name: str, oid: int) -> type[RangeLoader[Any]]:
return type(f"{name.title()}Loader", (RangeLoader,), {"subtype_oid": oid})
@cache
-def _make_binary_loader(name: str, oid: int) -> Type[RangeBinaryLoader[Any]]:
+def _make_binary_loader(name: str, oid: int) -> type[RangeBinaryLoader[Any]]:
return type(
f"{name.title()}BinaryLoader", (RangeBinaryLoader,), {"subtype_oid": oid}
)
from __future__ import annotations
-from typing import Type
-
from .. import postgres
from ..abc import AdaptContext, Buffer
from ..adapt import Dumper, Loader
@cache
-def _make_dumper(oid_in: int) -> Type[BaseGeometryDumper]:
+def _make_dumper(oid_in: int) -> type[BaseGeometryDumper]:
class GeometryDumper(BaseGeometryDumper):
oid = oid_in
@cache
-def _make_binary_dumper(oid_in: int) -> Type[BaseGeometryBinaryDumper]:
+def _make_binary_dumper(oid_in: int) -> type[BaseGeometryBinaryDumper]:
class GeometryBinaryDumper(BaseGeometryBinaryDumper):
oid = oid_in
# Copyright (C) 2021 The Psycopg Team
+from __future__ import annotations
+
import sys
-from typing import Type
import psycopg.errors as e
# Workaround for psycopg < 3.0.8.
# Timeout on NullPool connection mignt not work correctly.
try:
- ConnectionTimeout: Type[e.OperationalError] = e.ConnectionTimeout
+ ConnectionTimeout: type[e.OperationalError] = e.ConnectionTimeout
except AttributeError:
class DummyConnectionTimeout(e.OperationalError):
from __future__ import annotations
import logging
-from typing import Any, cast, Type
+from typing import Any, cast
from psycopg import Connection
from psycopg.pq import TransactionStatus
self,
conninfo: str = "",
*,
- connection_class: Type[CT] = cast(Type[CT], Connection),
+ connection_class: type[CT] = cast(type[CT], Connection),
kwargs: dict[str, Any] | None = None,
min_size: int = 0,
max_size: int | None = None,
from __future__ import annotations
import logging
-from typing import Any, cast, Type
+from typing import Any, cast
from psycopg import AsyncConnection
from psycopg.pq import TransactionStatus
self,
conninfo: str = "",
*,
- connection_class: Type[ACT] = cast(Type[ACT], AsyncConnection),
+ connection_class: type[ACT] = cast(type[ACT], AsyncConnection),
kwargs: dict[str, Any] | None = None,
min_size: int = 0, # Note: min_size default value changed to 0.
max_size: int | None = None,
from time import monotonic
from types import TracebackType
from typing import Any, Iterator, cast, Generic
-from typing import Type
from weakref import ref
from contextlib import contextmanager
self,
conninfo: str = "",
*,
- connection_class: Type[CT] = cast(Type[CT], Connection),
+ connection_class: type[CT] = cast(type[CT], Connection),
kwargs: dict[str, Any] | None = None,
min_size: int = 4,
max_size: int | None = None,
def __exit__(
self,
- exc_type: Type[BaseException] | None,
+ exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
from time import monotonic
from types import TracebackType
from typing import Any, AsyncIterator, cast, Generic
-from typing import Type
from weakref import ref
from contextlib import asynccontextmanager
self,
conninfo: str = "",
*,
- connection_class: Type[ACT] = cast(Type[ACT], AsyncConnection),
+ connection_class: type[ACT] = cast(type[ACT], AsyncConnection),
kwargs: dict[str, Any] | None = None,
min_size: int = 4,
max_size: int | None = None,
async def __aexit__(
self,
- exc_type: Type[BaseException] | None,
+ exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None: