if sys.version_info >= (3, 9):
from zoneinfo import ZoneInfo
+ from collections import Counter, deque as Deque
else:
from backports.zoneinfo import ZoneInfo
+ from typing import Counter, Deque
__all__ = [
+ "Counter",
+ "Deque",
"Protocol",
"ZoneInfo",
"asynccontextmanager",
# Copyright (C) 2021 The Psycopg Team
from random import random
-from typing import Any, Callable, Deque, Dict, Generic, Optional
-from typing import TYPE_CHECKING
-from collections import Counter, deque
+from typing import Any, Callable, Dict, Generic, Optional
from psycopg.abc import ConnectionType
-
-if TYPE_CHECKING:
- from typing import Counter as TCounter
+from psycopg._compat import Counter, Deque
class BasePool(Generic[ConnectionType]):
self.num_workers = num_workers
self._nconns = min_size # currently in the pool, out, being prepared
- self._pool: Deque[ConnectionType] = deque()
- self._stats: "TCounter[str]" = Counter()
+ self._pool = Deque[ConnectionType]()
+ self._stats = Counter[str]()
# Min number of connections in the pool in a max_idle unit of time.
# It is reset periodically by the ShrinkPool scheduled task.
from time import monotonic
from queue import Queue, Empty
from types import TracebackType
-from typing import Any, Callable, Deque, Dict, Iterator, List
+from typing import Any, Callable, Dict, Iterator, List
from typing import Optional, Type
from weakref import ref
from contextlib import contextmanager
-from collections import deque
from psycopg import errors as e
from psycopg import Connection
from psycopg.pq import TransactionStatus
+from psycopg._compat import Deque
from .base import ConnectionAttempt, BasePool
from .sched import Scheduler
self._reset = reset
self._lock = threading.RLock()
- self._waiting: Deque["WaitingClient"] = deque()
+ self._waiting = Deque["WaitingClient"]()
# to notify that the pool is full
self._pool_full_event: Optional[threading.Event] = None
# Also disable the warning for open connection in conn.__del__
conn._pool = None
- pos: Optional[WaitingClient] = None
-
# Critical section: if there is a client waiting give it the connection
# otherwise put it back into the pool.
with self._lock:
from abc import ABC, abstractmethod
from time import monotonic
from types import TracebackType
-from typing import Any, AsyncIterator, Awaitable, Callable, Deque
+from typing import Any, AsyncIterator, Awaitable, Callable
from typing import Dict, List, Optional, Type
from weakref import ref
-from collections import deque
from psycopg import errors as e
from psycopg.pq import TransactionStatus
-from psycopg._compat import Task, asynccontextmanager, create_task
+from psycopg._compat import Task, asynccontextmanager, create_task, Deque
from psycopg.connection_async import AsyncConnection
from .base import ConnectionAttempt, BasePool
self._reset = reset
self._lock = asyncio.Lock()
- self._waiting: Deque["AsyncClient"] = deque()
+ self._waiting = Deque["AsyncClient"]()
# to notify that the pool is full
self._pool_full_event: Optional[asyncio.Event] = None
# Also disable the warning for open connection in conn.__del__
conn._pool = None
- pos: Optional[AsyncClient] = None
-
# Critical section: if there is a client waiting give it the connection
# otherwise put it back into the pool.
async with self._lock:
from random import choice, random, randrange
from decimal import Decimal
from contextlib import contextmanager
-from collections import deque
import pytest
import psycopg
from psycopg import sql
from psycopg.adapt import PyFormat
-from psycopg._compat import asynccontextmanager
+from psycopg._compat import asynccontextmanager, Deque
from psycopg.types.range import Range
from psycopg.types.numeric import Int4, Int8
from psycopg.types.multirange import Multirange
def deep_import(name):
- parts = deque(name.split("."))
+ parts = Deque(name.split("."))
seen = []
if not parts:
raise ValueError("name must be a dot-separated name")
import weakref
from time import sleep, time
from threading import Thread, Event
-from collections import Counter
from typing import Any, List, Tuple
import pytest
import psycopg
from psycopg.pq import TransactionStatus
+from psycopg._compat import Counter
pytestmark = []
for retry in retries:
with retry:
with pool.ConnectionPool(dsn, min_size=4) as p:
- counts = Counter() # type: Counter[int]
+ counts = Counter[int]()
for i in range(8):
with p.connection() as conn:
sleep(0.1)
import asyncio
import logging
from time import time
-from collections import Counter
from typing import Any, List, Tuple
import pytest
import psycopg
from psycopg.pq import TransactionStatus
-from psycopg._compat import create_task
+from psycopg._compat import create_task, Counter
pytestmark = [
pytest.mark.asyncio,
async for retry in retries:
with retry:
async with pool.AsyncConnectionPool(dsn, min_size=4) as p:
- counts = Counter() # type: Counter[int]
+ counts = Counter[int]()
for i in range(8):
async with p.connection() as conn:
await asyncio.sleep(0.1)