from .pool import ConnectionPool
from .pool_async import AsyncConnectionPool
from psycopg import Connection, AsyncConnection # noqa: F401
+ from psycopg.rows import TupleRow # noqa: F401
# Connection types to make the pool generic
-CT = TypeVar("CT", bound="Connection[Any]")
-ACT = TypeVar("ACT", bound="AsyncConnection[Any]")
+CT = TypeVar("CT", bound="Connection[Any]", default="Connection[TupleRow]")
+ACT = TypeVar("ACT", bound="AsyncConnection[Any]", default="AsyncConnection[TupleRow]")
# Callbacks taking a connection from the pool
ConnectionCB: TypeAlias = Callable[[CT], None]
from __future__ import annotations
import logging
-from typing import Any, cast, Dict, Optional, overload, Type
+from typing import Any, cast, Dict, Optional, Type
from psycopg import Connection
from psycopg.pq import TransactionStatus
-from psycopg.rows import TupleRow
from .abc import CT, ConnectionCB, ConnectFailedCB
from .errors import PoolTimeout, TooManyRequests
class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool[CT]):
- @overload
- def __init__(
- self: NullConnectionPool[Connection[TupleRow]],
- conninfo: str = "",
- *,
- kwargs: Optional[Dict[str, Any]] = ...,
- min_size: int = ...,
- max_size: Optional[int] = ...,
- open: bool | None = ...,
- configure: Optional[ConnectionCB[CT]] = ...,
- check: Optional[ConnectionCB[CT]] = ...,
- reset: Optional[ConnectionCB[CT]] = ...,
- name: Optional[str] = ...,
- timeout: float = ...,
- max_waiting: int = ...,
- max_lifetime: float = ...,
- max_idle: float = ...,
- reconnect_timeout: float = ...,
- reconnect_failed: Optional[ConnectFailedCB] = ...,
- num_workers: int = ...,
- ):
- ...
-
- @overload
- def __init__(
- self: NullConnectionPool[CT],
- conninfo: str = "",
- *,
- connection_class: Type[CT] = ...,
- kwargs: Optional[Dict[str, Any]] = ...,
- min_size: int = ...,
- max_size: Optional[int] = ...,
- open: bool | None = ...,
- configure: Optional[ConnectionCB[CT]] = ...,
- check: Optional[ConnectionCB[CT]] = ...,
- reset: Optional[ConnectionCB[CT]] = ...,
- name: Optional[str] = ...,
- timeout: float = ...,
- max_waiting: int = ...,
- max_lifetime: float = ...,
- max_idle: float = ...,
- reconnect_timeout: float = ...,
- reconnect_failed: Optional[ConnectFailedCB] = ...,
- num_workers: int = ...,
- ):
- ...
-
def __init__(
self,
conninfo: str = "",
from __future__ import annotations
import logging
-from typing import Any, cast, Dict, Optional, overload, Type
+from typing import Any, cast, Dict, Optional, Type
from psycopg import AsyncConnection
from psycopg.pq import TransactionStatus
-from psycopg.rows import TupleRow
from .abc import ACT, AsyncConnectionCB, AsyncConnectFailedCB
from .errors import PoolTimeout, TooManyRequests
class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool[ACT]):
- @overload
- def __init__(
- self: AsyncNullConnectionPool[AsyncConnection[TupleRow]],
- conninfo: str = "",
- *,
- kwargs: Optional[Dict[str, Any]] = ...,
- min_size: int = ...,
- max_size: Optional[int] = ...,
- open: bool | None = ...,
- configure: Optional[AsyncConnectionCB[ACT]] = ...,
- check: Optional[AsyncConnectionCB[ACT]] = ...,
- reset: Optional[AsyncConnectionCB[ACT]] = ...,
- name: Optional[str] = ...,
- timeout: float = ...,
- max_waiting: int = ...,
- max_lifetime: float = ...,
- max_idle: float = ...,
- reconnect_timeout: float = ...,
- reconnect_failed: Optional[AsyncConnectFailedCB] = ...,
- num_workers: int = ...,
- ):
- ...
-
- @overload
- def __init__(
- self: AsyncNullConnectionPool[ACT],
- conninfo: str = "",
- *,
- connection_class: Type[ACT] = ...,
- kwargs: Optional[Dict[str, Any]] = ...,
- min_size: int = ...,
- max_size: Optional[int] = ...,
- open: bool | None = ...,
- configure: Optional[AsyncConnectionCB[ACT]] = ...,
- check: Optional[AsyncConnectionCB[ACT]] = ...,
- reset: Optional[AsyncConnectionCB[ACT]] = ...,
- name: Optional[str] = ...,
- timeout: float = ...,
- max_waiting: int = ...,
- max_lifetime: float = ...,
- max_idle: float = ...,
- reconnect_timeout: float = ...,
- reconnect_failed: Optional[AsyncConnectFailedCB] = ...,
- num_workers: int = ...,
- ):
- ...
-
def __init__(
self,
conninfo: str = "",
from time import monotonic
from types import TracebackType
from typing import Any, Iterator, cast, Dict, Generic, List
-from typing import Optional, overload, Sequence, Type
+from typing import Optional, Sequence, Type
from weakref import ref
from contextlib import contextmanager
from psycopg import errors as e
from psycopg import Connection
from psycopg.pq import TransactionStatus
-from psycopg.rows import TupleRow
from .abc import CT, ConnectionCB, ConnectFailedCB
from .base import ConnectionAttempt, BasePool
class ConnectionPool(Generic[CT], BasePool):
_pool: Deque[CT]
- @overload
- def __init__(
- self: ConnectionPool[Connection[TupleRow]],
- conninfo: str = "",
- *,
- kwargs: Optional[Dict[str, Any]] = ...,
- min_size: int = ...,
- max_size: Optional[int] = ...,
- open: bool | None = ...,
- configure: Optional[ConnectionCB[CT]] = ...,
- check: Optional[ConnectionCB[CT]] = ...,
- reset: Optional[ConnectionCB[CT]] = ...,
- name: Optional[str] = ...,
- timeout: float = ...,
- max_waiting: int = ...,
- max_lifetime: float = ...,
- max_idle: float = ...,
- reconnect_timeout: float = ...,
- reconnect_failed: Optional[ConnectFailedCB] = ...,
- num_workers: int = ...,
- ):
- ...
-
- @overload
- def __init__(
- self: ConnectionPool[CT],
- conninfo: str = "",
- *,
- connection_class: Type[CT] = ...,
- kwargs: Optional[Dict[str, Any]] = ...,
- min_size: int = ...,
- max_size: Optional[int] = ...,
- open: bool | None = ...,
- configure: Optional[ConnectionCB[CT]] = ...,
- check: Optional[ConnectionCB[CT]] = ...,
- reset: Optional[ConnectionCB[CT]] = ...,
- name: Optional[str] = ...,
- timeout: float = ...,
- max_waiting: int = ...,
- max_lifetime: float = ...,
- max_idle: float = ...,
- reconnect_timeout: float = ...,
- reconnect_failed: Optional[ConnectFailedCB] = ...,
- num_workers: int = ...,
- ):
- ...
-
def __init__(
self,
conninfo: str = "",
from time import monotonic
from types import TracebackType
from typing import Any, AsyncIterator, cast, Dict, Generic, List
-from typing import Optional, overload, Sequence, Type
+from typing import Optional, Sequence, Type
from weakref import ref
from contextlib import asynccontextmanager
from psycopg import errors as e
from psycopg import AsyncConnection
from psycopg.pq import TransactionStatus
-from psycopg.rows import TupleRow
from .abc import ACT, AsyncConnectionCB, AsyncConnectFailedCB
from .base import ConnectionAttempt, BasePool
class AsyncConnectionPool(Generic[ACT], BasePool):
_pool: Deque[ACT]
- @overload
- def __init__(
- self: AsyncConnectionPool[AsyncConnection[TupleRow]],
- conninfo: str = "",
- *,
- kwargs: Optional[Dict[str, Any]] = ...,
- min_size: int = ...,
- max_size: Optional[int] = ...,
- open: bool | None = ...,
- configure: Optional[AsyncConnectionCB[ACT]] = ...,
- check: Optional[AsyncConnectionCB[ACT]] = ...,
- reset: Optional[AsyncConnectionCB[ACT]] = ...,
- name: Optional[str] = ...,
- timeout: float = ...,
- max_waiting: int = ...,
- max_lifetime: float = ...,
- max_idle: float = ...,
- reconnect_timeout: float = ...,
- reconnect_failed: Optional[AsyncConnectFailedCB] = ...,
- num_workers: int = ...,
- ):
- ...
-
- @overload
- def __init__(
- self: AsyncConnectionPool[ACT],
- conninfo: str = "",
- *,
- connection_class: Type[ACT] = ...,
- kwargs: Optional[Dict[str, Any]] = ...,
- min_size: int = ...,
- max_size: Optional[int] = ...,
- open: bool | None = ...,
- configure: Optional[AsyncConnectionCB[ACT]] = ...,
- check: Optional[AsyncConnectionCB[ACT]] = ...,
- reset: Optional[AsyncConnectionCB[ACT]] = ...,
- name: Optional[str] = ...,
- timeout: float = ...,
- max_waiting: int = ...,
- max_lifetime: float = ...,
- max_idle: float = ...,
- reconnect_timeout: float = ...,
- reconnect_failed: Optional[AsyncConnectFailedCB] = ...,
- num_workers: int = ...,
- ):
- ...
-
def __init__(
self,
conninfo: str = "",
with p.connection() as conn:
assert inits == 1
res = conn.execute("show default_transaction_read_only")
- assert res.fetchone()[0] == "on" # type: ignore[index]
+ assert res.fetchone()[0] == "on"
with p.connection() as conn:
assert inits == 1
res = conn.execute("show default_transaction_read_only")
- assert res.fetchone()[0] == "on" # type: ignore[index]
+ assert res.fetchone()[0] == "on"
conn.close()
with p.connection() as conn:
assert inits == 2
res = conn.execute("show default_transaction_read_only")
- assert res.fetchone()[0] == "on" # type: ignore[index]
+ assert res.fetchone()[0] == "on"
def test_reset(dsn):
async with p.connection() as conn:
assert inits == 1
res = await conn.execute("show default_transaction_read_only")
- assert (await res.fetchone())[0] == "on" # type: ignore[index]
+ assert (await res.fetchone())[0] == "on"
async with p.connection() as conn:
assert inits == 1
res = await conn.execute("show default_transaction_read_only")
- assert (await res.fetchone())[0] == "on" # type: ignore[index]
+ assert (await res.fetchone())[0] == "on"
await conn.close()
async with p.connection() as conn:
assert inits == 2
res = await conn.execute("show default_transaction_read_only")
- assert (await res.fetchone())[0] == "on" # type: ignore[index]
+ assert (await res.fetchone())[0] == "on"
async def test_reset(dsn):