# Copyright (C) 2020 The Psycopg Team
+from __future__ import annotations
+
import os
import re
import logging
import importlib
-from typing import Dict
from collections import deque
# Classes which may have __module__ overwritten
-recovered_classes: Dict[type, str] = {}
+recovered_classes: dict[type, str] = {}
def recover_defined_module(m, skip_modules=()):
from __future__ import annotations
-from typing import Any, Dict, Type, cast, TYPE_CHECKING
+from typing import Any, Type, cast, TYPE_CHECKING
from . import pq
from . import errors as e
types: TypesRegistry
- _dumpers: Dict[PyFormat, Dict[type | str, Type[Dumper]]]
- _dumpers_by_oid: list[Dict[int, Type[Dumper]]]
- _loaders: list[Dict[int, Type[Loader]]]
+ _dumpers: dict[PyFormat, dict[type | str, Type[Dumper]]]
+ _dumpers_by_oid: list[dict[int, Type[Dumper]]]
+ _loaders: list[dict[int, Type[Loader]]]
# Record if a dumper or loader has an optimised version.
- _optimised: Dict[type, type] = {}
+ _optimised: dict[type, type] = {}
def __init__(
self,
# Copyright (C) 2024 The Psycopg Team
-from typing import Dict
+from __future__ import annotations
from . import pq
from . import _cmodule
"""
def __init__(self) -> None:
- self._cache: Dict[str, str] = {}
+ self._cache: dict[str, str] = {}
def has_encrypt_password(self, check: bool = False) -> bool:
"""Check if the `PGconn.encrypt_password()` method is implemented.
import sys
import struct
from abc import ABC, abstractmethod
-from typing import Any, Dict, Generic, Match, Sequence, Tuple, TYPE_CHECKING
+from typing import Any, Generic, Match, Sequence, Tuple, TYPE_CHECKING
from . import pq
from . import adapt
}
-def _dump_sub(m: Match[bytes], __map: Dict[bytes, bytes] = _dump_repl) -> bytes:
+def _dump_sub(m: Match[bytes], __map: dict[bytes, bytes] = _dump_repl) -> bytes:
return __map[m.group(0)]
_load_repl = {v: k for k, v in _dump_repl.items()}
-def _load_sub(m: Match[bytes], __map: Dict[bytes, bytes] = _load_repl) -> bytes:
+def _load_sub(m: Match[bytes], __map: dict[bytes, bytes] = _load_repl) -> bytes:
return __map[m.group(0)]
import re
import warnings
from random import randint
-from typing import Any, DefaultDict, Dict, NamedTuple, Sequence, TYPE_CHECKING
+from typing import Any, DefaultDict, NamedTuple, Sequence, TYPE_CHECKING
from collections import defaultdict
try:
async_resolver.cache = Cache()
-async def resolve_hostaddr_async(params: Dict[str, Any]) -> Dict[str, Any]:
+async def resolve_hostaddr_async(params: dict[str, Any]) -> dict[str, Any]:
"""
Perform async DNS lookup of the hosts and return a new params dict.
return out
-def resolve_srv(params: Dict[str, Any]) -> Dict[str, Any]:
+def resolve_srv(params: dict[str, Any]) -> dict[str, Any]:
"""Apply SRV DNS lookup as defined in :RFC:`2782`."""
return Rfc2782Resolver().resolve(params)
-async def resolve_srv_async(params: Dict[str, Any]) -> Dict[str, Any]:
+async def resolve_srv_async(params: dict[str, Any]) -> dict[str, Any]:
"""Async equivalent of `resolve_srv()`."""
return await Rfc2782Resolver().resolve_async(params)
re_srv_rr = re.compile(r"^(?P<service>_[^\.]+)\.(?P<proto>_[^\.]+)\.(?P<target>.+)")
- def resolve(self, params: Dict[str, Any]) -> Dict[str, Any]:
+ def resolve(self, params: dict[str, Any]) -> dict[str, Any]:
"""Update the parameters host and port after SRV lookup."""
attempts = self._get_attempts(params)
if not attempts:
return self._return_params(params, hps)
- async def resolve_async(self, params: Dict[str, Any]) -> Dict[str, Any]:
+ async def resolve_async(self, params: dict[str, Any]) -> dict[str, Any]:
"""Update the parameters host and port after SRV lookup."""
attempts = self._get_attempts(params)
if not attempts:
return self._return_params(params, hps)
- def _get_attempts(self, params: Dict[str, Any]) -> list[HostPort]:
+ def _get_attempts(self, params: dict[str, Any]) -> list[HostPort]:
"""
Return the list of host, and for each host if SRV lookup must be tried.
]
def _return_params(
- self, params: Dict[str, Any], hps: list[HostPort]
- ) -> Dict[str, Any]:
+ self, params: dict[str, Any], hps: list[HostPort]
+ ) -> dict[str, Any]:
if not hps:
# Nothing found, we ended up with an empty list
raise e.OperationalError("no host found after SRV RR lookup")
import re
import string
import codecs
-from typing import Any, Dict, TYPE_CHECKING
+from typing import Any, TYPE_CHECKING
from .pq._enums import ConnStatus
from .errors import NotSupportedError
"WIN874": "cp874",
}
-py_codecs: Dict[bytes, str] = {}
+py_codecs: dict[bytes, str] = {}
py_codecs.update((k.encode(), v) for k, v in _py_codecs.items())
# Add an alias without underscore, for lenient lookups
from __future__ import annotations
-from typing import Any, Dict, Sequence, Tuple, DefaultDict, TYPE_CHECKING
+from typing import Any, Sequence, Tuple, DefaultDict, TYPE_CHECKING
from collections import defaultdict
from . import pq
from .pq.abc import PGresult
from ._connection_base import BaseConnection
-DumperCache: TypeAlias = Dict[DumperKey, abc.Dumper]
-OidDumperCache: TypeAlias = Dict[int, abc.Dumper]
-LoaderCache: TypeAlias = Dict[int, abc.Loader]
+DumperCache: TypeAlias = dict[DumperKey, abc.Dumper]
+OidDumperCache: TypeAlias = dict[int, abc.Dumper]
+LoaderCache: TypeAlias = dict[int, abc.Loader]
TEXT = pq.Format.TEXT
PY_TEXT = PyFormat.TEXT
self._row_loaders: list[LoadFunc] = []
# mapping oid -> type sql representation
- self._oid_types: Dict[int, bytes] = {}
+ self._oid_types: dict[int, bytes] = {}
self._encoding = ""
from __future__ import annotations
import re
-from typing import Any, Callable, Dict, Mapping, Match, NamedTuple
+from typing import Any, Callable, Mapping, Match, NamedTuple
from typing import Sequence, Tuple, TYPE_CHECKING
from functools import lru_cache
formats.append(part.format)
elif isinstance(parts[0].item, str):
- seen: Dict[str, Tuple[bytes, PyFormat]] = {}
+ seen: dict[str, Tuple[bytes, PyFormat]] = {}
order = []
for part in parts[:-1]:
assert isinstance(part.item, str)
chunks.append(b"%s")
elif isinstance(parts[0].item, str):
- seen: Dict[str, Tuple[bytes, PyFormat]] = {}
+ seen: dict[str, Tuple[bytes, PyFormat]] = {}
order = []
for part in parts[:-1]:
assert isinstance(part.item, str)
from __future__ import annotations
import logging
-from typing import Dict
from datetime import timezone, tzinfo
from .pq.abc import PGconn
logger = logging.getLogger("psycopg")
-_timezones: Dict[bytes | None, tzinfo] = {
+_timezones: dict[bytes | None, tzinfo] = {
None: timezone.utc,
b"UTC": timezone.utc,
}
from __future__ import annotations
-from typing import Any, Dict, Callable, Generator, Mapping
+from typing import Any, Callable, Generator, Mapping
from typing import Protocol, Sequence, Tuple, TYPE_CHECKING
from . import pq
PipelineCommand: TypeAlias = Callable[[], None]
DumperKey: TypeAlias = type | Tuple["DumperKey", ...]
ConnParam: TypeAlias = str | int | None
-ConnDict: TypeAlias = Dict[str, ConnParam]
+ConnDict: TypeAlias = dict[str, ConnParam]
ConnMapping: TypeAlias = Mapping[str, ConnParam]
from __future__ import annotations
from dataclasses import dataclass, field, fields
-from typing import Any, Callable, Dict, NoReturn, Sequence, Tuple, Type, TYPE_CHECKING
+from typing import Any, Callable, NoReturn, Sequence, Tuple, Type, TYPE_CHECKING
from asyncio import CancelledError
from .pq.abc import PGconn, PGresult
if TYPE_CHECKING:
from .pq.misc import PGnotify, ConninfoOption
-ErrorInfo: TypeAlias = None | PGresult | Dict[int, bytes | None]
+ErrorInfo: TypeAlias = None | PGresult | dict[int, bytes | None]
-_sqlcodes: Dict[str, "Type[Error]"] = {}
+_sqlcodes: dict[str, "Type[Error]"] = {}
@dataclass
from __future__ import annotations
import functools
-from typing import Any, Callable, Dict, NamedTuple, NoReturn
+from typing import Any, Callable, NamedTuple, NoReturn
from typing import TYPE_CHECKING, Protocol, Sequence, Tuple, Type
from collections import namedtuple
"""
-DictRow: TypeAlias = Dict[str, Any]
+DictRow: TypeAlias = dict[str, Any]
"""
An alias for the type returned by `dict_row()`
if names is None:
return no_result
- def dict_row_(values: Sequence[Any]) -> Dict[str, Any]:
+ def dict_row_(values: Sequence[Any]) -> dict[str, Any]:
return dict(zip(names, values))
return dict_row_
import re
import struct
from collections import namedtuple
-from typing import Any, Callable, cast, Dict, Iterator
+from typing import Any, Callable, cast, Iterator
from typing import NamedTuple, Sequence, Tuple, Type, TYPE_CHECKING
from .. import pq
# Usually there will be only one, but if there is more than one
# row in the same query (in different columns, or even in different
# records), oids might differ and we'd need separate transformers.
- self._txs: Dict[Tuple[int, ...], abc.Transformer] = {}
+ self._txs: dict[Tuple[int, ...], abc.Transformer] = {}
def load(self, data: abc.Buffer) -> Tuple[Any, ...]:
nfields = unpack_len(data, 0)[0]
from __future__ import annotations
from enum import Enum
-from typing import Any, Dict, Generic, Mapping, Sequence
-from typing import Tuple, Type, cast, TYPE_CHECKING
+from typing import Any, Generic, Mapping, Sequence, Tuple, Type, cast, TYPE_CHECKING
from .. import sql
from .. import postgres
E = TypeVar("E", bound=Enum)
-EnumDumpMap: TypeAlias = Dict[E, bytes]
-EnumLoadMap: TypeAlias = Dict[bytes, E]
+EnumDumpMap: TypeAlias = dict[E, bytes]
+EnumLoadMap: TypeAlias = dict[bytes, E]
EnumMapping: TypeAlias = Mapping[E, str] | Sequence[Tuple[E, str]] | None
# Hashable versions
"""
-Dict to hstore adaptation
+dict to hstore adaptation
"""
# Copyright (C) 2021 The Psycopg Team
from __future__ import annotations
import re
-from typing import Dict, Type
+from typing import Type
from .. import errors as e
from .. import postgres
)
-Hstore: TypeAlias = Dict[str, str | None]
+Hstore: TypeAlias = dict[str, str | None]
class BaseHstoreDumper(RecursiveDumper):
from __future__ import annotations
import json
-from typing import Any, Callable, Dict, Tuple, Type
+from typing import Any, Callable, Tuple, Type
from .. import abc
from .. import _oids
return _default_dumpers[cls, format]
-_default_dumpers: Dict[Tuple[Type[_JsonWrapper], PyFormat], Type[Dumper]] = {
+_default_dumpers: dict[Tuple[Type[_JsonWrapper], PyFormat], Type[Dumper]] = {
(Json, PyFormat.BINARY): JsonBinaryDumper,
(Json, PyFormat.TEXT): JsonDumper,
(Jsonb, PyFormat.BINARY): JsonbBinaryDumper,
import struct
from abc import ABC, abstractmethod
from math import log
-from typing import Any, Callable, DefaultDict, Dict, Tuple, cast, TYPE_CHECKING
+from typing import Any, Callable, DefaultDict, Tuple, cast, TYPE_CHECKING
from decimal import Decimal, DefaultContext, Context
from .. import _oids
class _SpecialValuesDumper(Dumper):
- _special: Dict[bytes, bytes] = {}
+ _special: dict[bytes, bytes] = {}
def dump(self, obj: Any) -> Buffer | None:
return str(obj).encode()
from __future__ import annotations
import re
-from typing import Any, Dict, Generic, Type, Tuple
-from typing import cast, TYPE_CHECKING
+from typing import Any, Generic, Type, Tuple, cast, TYPE_CHECKING
from decimal import Decimal
from datetime import date, datetime
def __ge__(self, other: Any) -> bool:
return self == other or self > other # type: ignore
- def __getstate__(self) -> Dict[str, Any]:
+ def __getstate__(self) -> dict[str, Any]:
return {
slot: getattr(self, slot) for slot in self.__slots__ if hasattr(self, slot)
}
- def __setstate__(self, state: Dict[str, Any]) -> None:
+ def __setstate__(self, state: dict[str, Any]) -> None:
for slot, value in state.items():
setattr(self, slot, value)
from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM
from cpython.object cimport PyObject, PyObject_CallFunctionObjArgs
-from typing import Any, Dict, Iterable, Sequence, Tuple
+from typing import Any, Iterable, Sequence, Tuple
from psycopg import errors as e
from psycopg.pq import Format as PqFormat
from time import monotonic
from random import random
-from typing import Any, Dict, Tuple, TYPE_CHECKING
+from typing import Any, Tuple, TYPE_CHECKING
from psycopg import errors as e
self,
conninfo: str = "",
*,
- kwargs: Dict[str, Any] | None,
+ kwargs: dict[str, Any] | None,
min_size: int,
max_size: int | None,
name: str | None,
raise ValueError("num_workers must be at least 1")
self.conninfo = conninfo
- self.kwargs: Dict[str, Any] = kwargs or {}
+ self.kwargs: dict[str, Any] = kwargs or {}
self.name = name
self._min_size = min_size
self._max_size = max_size
f"can't return connection to pool {self.name!r}, {msg}: {conn}"
)
- def get_stats(self) -> Dict[str, int]:
+ def get_stats(self) -> dict[str, int]:
"""
Return current stats about the pool usage.
"""
rv.update(self._get_measures())
return rv
- def pop_stats(self) -> Dict[str, int]:
+ def pop_stats(self) -> dict[str, int]:
"""
Return current stats about the pool usage.
rv.update(self._get_measures())
return rv
- def _get_measures(self) -> Dict[str, int]:
+ def _get_measures(self) -> dict[str, int]:
"""
Return immediate measures of the pool (not counters).
"""
from __future__ import annotations
import logging
-from typing import Any, cast, Dict, Type
+from typing import Any, cast, Type
from psycopg import Connection
from psycopg.pq import TransactionStatus
conninfo: str = "",
*,
connection_class: Type[CT] = cast(Type[CT], Connection),
- kwargs: Dict[str, Any] | None = None,
+ kwargs: dict[str, Any] | None = None,
min_size: int = 0,
max_size: int | None = None,
open: bool | None = None,
from __future__ import annotations
import logging
-from typing import Any, cast, Dict, Type
+from typing import Any, cast, Type
from psycopg import AsyncConnection
from psycopg.pq import TransactionStatus
conninfo: str = "",
*,
connection_class: Type[ACT] = cast(Type[ACT], AsyncConnection),
- kwargs: Dict[str, Any] | None = None,
+ kwargs: dict[str, Any] | None = None,
min_size: int = 0, # Note: min_size default value changed to 0.
max_size: int | None = None,
open: bool | None = None,
from abc import ABC, abstractmethod
from time import monotonic
from types import TracebackType
-from typing import Any, Iterator, cast, Dict, Generic
+from typing import Any, Iterator, cast, Generic
from typing import Type
from weakref import ref
from contextlib import contextmanager
conninfo: str = "",
*,
connection_class: Type[CT] = cast(Type[CT], Connection),
- kwargs: Dict[str, Any] | None = None,
+ kwargs: dict[str, Any] | None = None,
min_size: int = 4,
max_size: int | None = None,
open: bool | None = None,
)
to_close.close()
- def _get_measures(self) -> Dict[str, int]:
+ def _get_measures(self) -> dict[str, int]:
rv = super()._get_measures()
rv[self._REQUESTS_WAITING] = len(self._waiting)
return rv
from abc import ABC, abstractmethod
from time import monotonic
from types import TracebackType
-from typing import Any, AsyncIterator, cast, Dict, Generic
+from typing import Any, AsyncIterator, cast, Generic
from typing import Type
from weakref import ref
from contextlib import asynccontextmanager
conninfo: str = "",
*,
connection_class: Type[ACT] = cast(Type[ACT], AsyncConnection),
- kwargs: Dict[str, Any] | None = None,
+ kwargs: dict[str, Any] | None = None,
min_size: int = 4,
max_size: int | None = None,
open: bool | None = None,
)
await to_close.close()
- def _get_measures(self) -> Dict[str, int]:
+ def _get_measures(self) -> dict[str, int]:
rv = super()._get_measures()
rv[self._REQUESTS_WAITING] = len(self._waiting)
return rv
import asyncio
import selectors
import sys
-from typing import Any, Dict
+from typing import Any
import pytest
cache.set("segfault", True)
-asyncio_options: Dict[str, Any] = {}
+asyncio_options: dict[str, Any] = {}
if sys.platform == "win32":
asyncio_options["loop_factory"] = (
asyncio.WindowsSelectorEventLoopPolicy().new_event_loop
import unittest
import time
import sys
-from typing import Any, Dict
+from typing import Any
# Revision 1.12 2009/02/06 03:35:11 kf7xm
# method is to be found
driver: Any = None
connect_args = () # List of arguments to pass to connect
- connect_kw_args: Dict[Any, Any] = {} # Keyword arguments for connect
+ connect_kw_args: dict[Any, Any] = {} # Keyword arguments for connect
table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables
ddl1 = 'create table %sbooze (name varchar(20))' % table_prefix
import logging
import weakref
from time import time
-from typing import Any, Dict, Tuple
+from typing import Any, Tuple
import pytest
pool.ConnectionPool(min_size=min_size, max_size=max_size)
-class MyRow(Dict[str, Any]):
+class MyRow(dict[str, Any]):
pass
import logging
import weakref
from time import time
-from typing import Any, Dict, Tuple
+from typing import Any, Tuple
import pytest
pool.AsyncConnectionPool(min_size=min_size, max_size=max_size)
-class MyRow(Dict[str, Any]):
+class MyRow(dict[str, Any]):
pass
from __future__ import annotations
import logging
-from typing import Any, Dict
+from typing import Any
import pytest
from packaging.version import parse as ver # noqa: F401 # used in skipif
pool.NullConnectionPool(min_size=min_size, max_size=max_size)
-class MyRow(Dict[str, Any]):
+class MyRow(dict[str, Any]):
pass
from __future__ import annotations
import logging
-from typing import Any, Dict
+from typing import Any
import pytest
from packaging.version import parse as ver # noqa: F401 # used in skipif
pool.AsyncNullConnectionPool(min_size=min_size, max_size=max_size)
-class MyRow(Dict[str, Any]):
+class MyRow(dict[str, Any]):
pass
import asyncio
import logging
from enum import Enum
-from typing import Any, Dict, Generator
+from typing import Any, Generator
from argparse import ArgumentParser, Namespace
from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor
ids: list[int] = []
-data: list[Dict[str, Any]] = []
+data: list[dict[str, Any]] = []
def main() -> None:
import pytest
import datetime as dt
-from typing import Any, Dict
+from typing import Any
import psycopg
from psycopg.conninfo import conninfo_to_dict
class PsycopgTests(dbapi20.DatabaseAPI20Test):
driver = psycopg
# connect_args = () # set by the fixture
- connect_kw_args: Dict[Any, Any] = {}
+ connect_kw_args: dict[Any, Any] = {}
def test_nextset(self):
# tested elsewhere
from __future__ import annotations
from dataclasses import dataclass
-from typing import Any, Callable, Dict, Sequence, Tuple
+from typing import Any, Callable, Sequence, Tuple
from psycopg import Connection, Cursor, ServerCursor, connect, rows
from psycopg import AsyncConnection, AsyncCursor, AsyncServerCursor
v1: Tuple[Any, ...] = conn1.execute("").fetchall()[0]
conn2 = connect(row_factory=rows.dict_row)
- v2: Dict[str, Any] = conn2.execute("").fetchall()[0]
+ v2: dict[str, Any] = conn2.execute("").fetchall()[0]
conn3 = connect(row_factory=rows.class_row(Person))
v3: Person = conn3.execute("").fetchall()[0]