import sys
-from asyncio import to_thread
-from zoneinfo import ZoneInfo
-from functools import cache
-from collections import Counter, deque as Deque
-
-
if sys.version_info >= (3, 10):
from typing import TypeGuard, TypeAlias
else:
from typing_extensions import TypeVar
__all__ = [
- "Counter",
- "Deque",
"LiteralString",
"Self",
"TypeAlias",
"TypeGuard",
"TypeVar",
- "ZoneInfo",
- "cache",
- "to_thread",
]
from weakref import ref, ReferenceType
from warnings import warn
from functools import partial
+from collections import deque
from . import pq
from . import errors as e
from .rows import Row
from .adapt import AdaptersMap
from ._enums import IsolationLevel
-from ._compat import Deque, LiteralString, Self, TypeAlias, TypeVar
+from ._compat import LiteralString, Self, TypeAlias, TypeVar
from .pq.misc import connection_summary
from ._pipeline import BasePipeline
from ._preparing import PrepareManager
pgconn.notify_handler = partial(BaseConnection._notify_handler, wself)
# Gather notifies when the notifies() generator is not running.
- self._notifies_backlog = Deque[Notify]()
+ self._notifies_backlog = deque[Notify]()
self._notifies_backlog_handler = partial(
BaseConnection._add_notify_to_backlog, wself
)
import string
import codecs
from typing import Any, TYPE_CHECKING
+from functools import cache
from .pq._enums import ConnStatus
from .errors import NotSupportedError
-from ._compat import cache
if TYPE_CHECKING:
from ._connection_base import BaseConnection
import logging
from types import TracebackType
from typing import Any, TYPE_CHECKING
+from collections import deque
from . import pq
from . import errors as e
from .abc import PipelineCommand, PQGen
-from ._compat import Deque, Self, TypeAlias
+from ._compat import Self, TypeAlias
from .pq.misc import connection_summary
from .generators import pipeline_communicate, fetch_many, send
from ._capabilities import capabilities
class BasePipeline:
- command_queue: Deque[PipelineCommand]
- result_queue: Deque[PendingResult]
+ command_queue: deque[PipelineCommand]
+ result_queue: deque[PendingResult]
def __init__(self, conn: BaseConnection[Any]) -> None:
self._conn = conn
self.pgconn = conn.pgconn
- self.command_queue = Deque[PipelineCommand]()
- self.result_queue = Deque[PendingResult]()
+ self.command_queue = deque[PipelineCommand]()
+ self.result_queue = deque[PendingResult]()
self.level = 0
def __repr__(self) -> str:
from enum import IntEnum, auto
from typing import Any, TYPE_CHECKING
+from collections import deque, OrderedDict
from collections.abc import Sequence
-from collections import OrderedDict
from . import pq
from .abc import PQGen
-from ._compat import Deque, TypeAlias
+from ._compat import TypeAlias
from ._queries import PostgresQuery
if TYPE_CHECKING:
# Counter to generate prepared statements names
self._prepared_idx = 0
- self._to_flush = Deque["bytes | None"]()
+ self._to_flush = deque["bytes | None"]()
@staticmethod
def key(query: PostgresQuery) -> Key:
import logging
from datetime import timezone, tzinfo
+from zoneinfo import ZoneInfo
from .pq.abc import PGconn
-from ._compat import ZoneInfo
logger = logging.getLogger("psycopg")
if True: # ASYNC
import sys
import asyncio
- from asyncio import Lock
- from ._compat import to_thread
+ from asyncio import Lock, to_thread
else:
from threading import Lock
import logging
from time import monotonic
+from collections import deque
from . import pq
from . import errors as e
from .abc import Buffer, PipelineCommand, PQGen, PQGenConn
from .pq.abc import PGcancelConn, PGconn, PGresult
from .waiting import Wait, Ready
-from ._compat import Deque
from ._cmodule import _psycopg
from ._encodings import conninfo_encoding
def _pipeline_communicate(
- pgconn: PGconn, commands: Deque[PipelineCommand]
+ pgconn: PGconn, commands: deque[PipelineCommand]
) -> PQGen[list[list[PGresult]]]:
"""Generator to send queries from a connection in pipeline mode while also
receiving results.
import ctypes.util
from typing import NamedTuple
from pathlib import Path
+from functools import cache
from . import abc
from ._enums import ConnStatus, TransactionStatus, PipelineStatus
-from .._compat import cache
logger = logging.getLogger("psycopg.pq")
import struct
from math import prod
from typing import Any, cast, Callable
+from functools import cache
from .. import pq
from .. import errors as e
from ..abc import AdaptContext, Buffer, Dumper, DumperKey, NoneType, Loader, Transformer
from ..adapt import RecursiveDumper, RecursiveLoader, PyFormat
from .._oids import TEXT_OID, INVALID_OID, TEXT_ARRAY_OID
-from .._compat import cache
from .._struct import pack_len, unpack_len
from .._cmodule import _psycopg
from .._typeinfo import TypeInfo
import re
import struct
from typing import Any, Callable, cast, NamedTuple, TYPE_CHECKING
+from functools import cache
from collections import namedtuple
from collections.abc import Iterator, Sequence
from .. import postgres
from ..adapt import Transformer, PyFormat, RecursiveDumper, Loader, Dumper, Buffer
from .._oids import TEXT_OID
-from .._compat import cache
from .._struct import pack_len, unpack_len
from .._typeinfo import TypeInfo
from .._encodings import _as_python_identifier
from enum import Enum
from typing import Any, Generic, cast, TYPE_CHECKING
+from functools import cache
from collections.abc import Mapping, Sequence
from .. import sql
from ..pq import Format
from ..abc import AdaptContext, Query
from ..adapt import Buffer, Dumper, Loader
-from .._compat import cache, TypeAlias, TypeVar
+from .._compat import TypeAlias, TypeVar
from .._encodings import conn_encoding
from .._typeinfo import TypeInfo
from __future__ import annotations
import re
+from functools import cache
from .. import errors as e
from .. import postgres
from ..abc import Buffer, AdaptContext
from .._oids import TEXT_OID
from ..adapt import PyFormat, RecursiveDumper, RecursiveLoader
-from .._compat import cache, TypeAlias
+from .._compat import TypeAlias
from .._typeinfo import TypeInfo
_re_escape = re.compile(r'(["\\])')
import json
from typing import Any, Callable
+from functools import cache
from .. import abc
from .. import _oids
from ..pq import Format
from ..adapt import Buffer, Dumper, Loader, PyFormat, AdaptersMap
from ..errors import DataError
-from .._compat import cache, TypeAlias
+from .._compat import TypeAlias
JsonDumpsFunction: TypeAlias = Callable[[Any], "str | bytes"]
JsonLoadsFunction: TypeAlias = Callable[["str | bytes"], Any]
from decimal import Decimal
from typing import Any, Generic, overload, TYPE_CHECKING
from datetime import date, datetime
+from functools import cache
from collections.abc import Iterable, MutableSequence
from .. import sql
from ..abc import AdaptContext, Buffer, Dumper, DumperKey, Query
from ..adapt import RecursiveDumper, RecursiveLoader, PyFormat
from .._oids import INVALID_OID, TEXT_OID
-from .._compat import cache
from .._struct import pack_len, unpack_len
from .._typeinfo import TypeInfo, TypesRegistry
from typing import Any, Generic, cast, TYPE_CHECKING
from decimal import Decimal
from datetime import date, datetime
+from functools import cache
from .. import sql
from .. import _oids
from ..abc import AdaptContext, Buffer, Dumper, DumperKey, DumpFunc, LoadFunc, Query
from ..adapt import RecursiveDumper, RecursiveLoader, PyFormat
from .._oids import INVALID_OID, TEXT_OID
-from .._compat import cache, TypeVar
+from .._compat import TypeVar
from .._struct import pack_len, unpack_len
from .._typeinfo import TypeInfo, TypesRegistry
from __future__ import annotations
+from functools import cache
+
from .. import postgres
from ..abc import AdaptContext, Buffer
from ..adapt import Dumper, Loader
from ..pq import Format
-from .._compat import cache
from .._typeinfo import TypeInfo
try:
from __future__ import annotations
from typing import Any, Sequence
+from collections import deque
from psycopg import pq, abc, BaseConnection
from psycopg.rows import Row, RowMaker
from psycopg.adapt import AdaptersMap, PyFormat
from psycopg.pq.abc import PGcancelConn, PGconn, PGresult
-from psycopg._compat import Deque
class Transformer(abc.AdaptContext):
types: tuple[int, ...] | None
def fetch_many(pgconn: PGconn) -> abc.PQGen[list[PGresult]]: ...
def fetch(pgconn: PGconn) -> abc.PQGen[PGresult | None]: ...
def pipeline_communicate(
- pgconn: PGconn, commands: Deque[abc.PipelineCommand]
+ pgconn: PGconn, commands: deque[abc.PipelineCommand]
) -> abc.PQGen[list[list[PGresult]]]: ...
def wait_c(
gen: abc.PQGen[abc.RV], fileno: int, interval: float | None = None
from cpython.object cimport PyObject_CallFunctionObjArgs
from time import monotonic
+from collections import deque
from psycopg import errors as e
from psycopg.pq import abc
from psycopg.abc import PipelineCommand, PQGen
from psycopg._enums import Wait, Ready
-from psycopg._compat import Deque
from psycopg._encodings import conninfo_encoding
cdef object WAIT_W = Wait.W
def pipeline_communicate(
- pq.PGconn pgconn, commands: Deque[PipelineCommand]
+ pq.PGconn pgconn, commands: deque[PipelineCommand]
) -> PQGen[list[list[PGresult]]]:
"""Generator to send queries from a connection in pipeline mode while also
receiving results.
cdef int *_uspad
from datetime import date, time, timedelta, datetime, timezone
+from zoneinfo import ZoneInfo
from psycopg_c._psycopg cimport endian
from psycopg import errors as e
-from psycopg._compat import ZoneInfo
# Initialise the datetime C API
from __future__ import annotations
import sys
-from collections import Counter, deque as Deque
if sys.version_info >= (3, 10):
from typing import TypeAlias
import psycopg.errors as e
__all__ = [
- "Counter",
- "Deque",
"Self",
"TypeAlias",
"TypeVar",
from time import monotonic
from random import random
from typing import Any, TYPE_CHECKING
+from collections import Counter, deque
from psycopg import errors as e
from .errors import PoolClosed
-from ._compat import Counter, Deque
if TYPE_CHECKING:
from psycopg._connection_base import BaseConnection
_CONNECTIONS_ERRORS = "connections_errors"
_CONNECTIONS_LOST = "connections_lost"
- _pool: Deque[Any]
+ _pool: deque[Any]
def __init__(
self,
self.num_workers = num_workers
self._nconns = min_size # currently in the pool, out, being prepared
- self._pool = Deque()
+ self._pool = deque()
self._stats = Counter[str]()
# Min number of connections in the pool in a max_idle unit of time.
from time import monotonic
from types import TracebackType
from typing import Any, cast, Generic
+from collections import deque
from collections.abc import Iterator
from weakref import ref
from contextlib import contextmanager
from .abc import CT, ConnectionCB, ConnectFailedCB
from .base import AttemptWithBackoff, BasePool
from .errors import PoolClosed, PoolTimeout, TooManyRequests
-from ._compat import Deque, Self
+from ._compat import Self
from ._acompat import Condition, Event, Lock, Queue, Worker, spawn, gather
from ._acompat import sleep, current_thread_name
from .sched import Scheduler
class ConnectionPool(Generic[CT], BasePool):
- _pool: Deque[CT]
+ _pool: deque[CT]
def __init__(
self,
self._sched: Scheduler
self._tasks: Queue[MaintenanceTask]
- self._waiting = Deque[WaitingClient[CT]]()
+ self._waiting = deque[WaitingClient[CT]]()
# to notify that the pool is full
self._pool_full_event: Event | None = None
from time import monotonic
from types import TracebackType
from typing import Any, cast, Generic
+from collections import deque
from collections.abc import AsyncIterator
from weakref import ref
from contextlib import asynccontextmanager
from .abc import ACT, AsyncConnectionCB, AsyncConnectFailedCB
from .base import AttemptWithBackoff, BasePool
from .errors import PoolClosed, PoolTimeout, TooManyRequests
-from ._compat import Deque, Self
+from ._compat import Self
from ._acompat import ACondition, AEvent, ALock, AQueue, AWorker, aspawn, agather
from ._acompat import asleep, current_task_name
from .sched_async import AsyncScheduler
class AsyncConnectionPool(Generic[ACT], BasePool):
- _pool: Deque[ACT]
+ _pool: deque[ACT]
def __init__(
self,
self._sched: AsyncScheduler
self._tasks: AQueue[MaintenanceTask]
- self._waiting = Deque[WaitingClient[ACT]]()
+ self._waiting = deque[WaitingClient[ACT]]()
# to notify that the pool is full
self._pool_full_event: AEvent | None = None
import sys
import pytest
import logging
+from functools import cache
from contextlib import contextmanager
import psycopg
from psycopg import pq
from psycopg import sql
from psycopg.conninfo import conninfo_to_dict, make_conninfo
-from psycopg._compat import cache
from psycopg.pq._debug import PGconnDebug
from .utils import check_postgres_version
from typing import Any
from decimal import Decimal
from contextlib import contextmanager, asynccontextmanager
+from collections import deque
import pytest
import psycopg
from psycopg import sql
from psycopg.adapt import PyFormat
-from psycopg._compat import Deque
from psycopg.types.range import Range
from psycopg.types.json import Json, Jsonb
from psycopg.types.numeric import Int4, Int8
def deep_import(name):
- parts = Deque(name.split("."))
+ parts = deque(name.split("."))
seen = []
if not parts:
raise ValueError("name must be a dot-separated name")
from contextlib import contextmanager
from functools import partial
from typing import Any
+from collections import deque
from collections.abc import Iterator, Sequence
from psycopg import AsyncConnection, Connection
from psycopg.abc import PipelineCommand
from psycopg.generators import pipeline_communicate
from psycopg.pq import Format, DiagnosticField
-from psycopg._compat import Deque
psycopg_logger = logging.getLogger("psycopg")
pipeline_logger = logging.getLogger("pipeline")
@contextmanager
def prepare_pipeline_demo_pq(
pgconn: LoggingPGconn, rows_to_send: int, logger: logging.Logger
-) -> Iterator[tuple[Deque[PipelineCommand], Deque[str]]]:
+) -> Iterator[tuple[deque[PipelineCommand], deque[str]]]:
"""Set up pipeline demo with initial queries and yield commands and
results queue for pipeline_communicate().
"""
),
]
- commands = Deque[PipelineCommand]()
- results_queue = Deque[str]()
+ commands = deque[PipelineCommand]()
+ results_queue = deque[str]()
for qname, query in setup_queries:
if qname == "prepare":
import datetime as dt
import pytest
+from zoneinfo import ZoneInfo
from psycopg import DataError, pq, sql
from psycopg.adapt import PyFormat
-from psycopg._compat import ZoneInfo
crdb_skip_datestyle = pytest.mark.crdb("skip", reason="set datestyle/intervalstyle")
crdb_skip_negative_interval = pytest.mark.crdb("skip", reason="negative interval")