--- /dev/null
+"""
+Types used in the psycopg_pool package
+"""
+
+# Copyright (C) 2023 The Psycopg Team
+
+from __future__ import annotations
+
+from typing import Any, Awaitable, Callable, TypeVar, Union, TYPE_CHECKING
+
+from typing_extensions import TypeAlias
+
+if TYPE_CHECKING:
+ from .pool import ConnectionPool
+ from .pool_async import AsyncConnectionPool
+ from psycopg import Connection, AsyncConnection
+
+# Connection types to make the pool generic
+CT = TypeVar("CT", bound="Connection[Any]")
+ACT = TypeVar("ACT", bound="AsyncConnection[Any]")
+
+# Callbacks taking a connection from the pool
+ConnectionCB: TypeAlias = Callable[[CT], None]
+AsyncConnectionCB: TypeAlias = Callable[[ACT], Awaitable[None]]
+
+# Callbacks to pass the pool to on connection failure
+ConnectFailedCB: TypeAlias = Callable[["ConnectionPool[Any]"], None]
+AsyncConnectFailedCB: TypeAlias = Union[
+ Callable[["AsyncConnectionPool[Any]"], None],
+ Callable[["AsyncConnectionPool[Any]"], Awaitable[None]],
+]
import logging
import threading
-from typing import Any, Callable, cast, Dict, Optional, overload, Tuple, Type
+from typing import Any, cast, Dict, Optional, overload, Tuple, Type
from psycopg import Connection
from psycopg.pq import TransactionStatus
from psycopg.rows import TupleRow
-from .pool import ConnectionPool, CT, AddConnection, ConnectFailedCB
+from .abc import CT, ConnectionCB, ConnectFailedCB
+from .pool import ConnectionPool, AddConnection
from .errors import PoolTimeout, TooManyRequests
from ._compat import ConnectionTimeout
conninfo: str = "",
*,
open: bool = ...,
- configure: Optional[Callable[[CT], None]] = ...,
- reset: Optional[Callable[[CT], None]] = ...,
+ configure: Optional[ConnectionCB[CT]] = ...,
+ reset: Optional[ConnectionCB[CT]] = ...,
kwargs: Optional[Dict[str, Any]] = ...,
min_size: int = ...,
max_size: Optional[int] = ...,
*,
open: bool = ...,
connection_class: Type[CT],
- configure: Optional[Callable[[CT], None]] = ...,
- reset: Optional[Callable[[CT], None]] = ...,
+ configure: Optional[ConnectionCB[CT]] = ...,
+ reset: Optional[ConnectionCB[CT]] = ...,
kwargs: Optional[Dict[str, Any]] = ...,
min_size: int = ...,
max_size: Optional[int] = ...,
*,
open: bool = True,
connection_class: Type[CT] = cast(Type[CT], Connection),
- configure: Optional[Callable[[CT], None]] = None,
- reset: Optional[Callable[[CT], None]] = None,
+ configure: Optional[ConnectionCB[CT]] = None,
+ reset: Optional[ConnectionCB[CT]] = None,
kwargs: Optional[Dict[str, Any]] = None,
# Note: default value changed to 0.
min_size: int = 0,
import asyncio
import logging
-from typing import Any, Awaitable, Callable, cast, Dict, Optional, overload, Type
+from typing import Any, cast, Dict, Optional, overload, Type
from psycopg import AsyncConnection
from psycopg.pq import TransactionStatus
from psycopg.rows import TupleRow
+from .abc import ACT, AsyncConnectionCB, AsyncConnectFailedCB
from .errors import PoolTimeout, TooManyRequests
from ._compat import ConnectionTimeout
from .null_pool import _BaseNullConnectionPool
-from .pool_async import AsyncConnectionPool, ACT, AddConnection, AsyncConnectFailedCB
+from .pool_async import AsyncConnectionPool, AddConnection
logger = logging.getLogger("psycopg.pool")
conninfo: str = "",
*,
open: bool = ...,
- configure: Optional[Callable[[ACT], Awaitable[None]]] = ...,
- reset: Optional[Callable[[ACT], Awaitable[None]]] = ...,
+ configure: Optional[AsyncConnectionCB[ACT]] = ...,
+ reset: Optional[AsyncConnectionCB[ACT]] = ...,
kwargs: Optional[Dict[str, Any]] = ...,
min_size: int = ...,
max_size: Optional[int] = ...,
*,
open: bool = ...,
connection_class: Type[ACT],
- configure: Optional[Callable[[ACT], Awaitable[None]]] = ...,
- reset: Optional[Callable[[ACT], Awaitable[None]]] = ...,
+ configure: Optional[AsyncConnectionCB[ACT]] = ...,
+ reset: Optional[AsyncConnectionCB[ACT]] = ...,
kwargs: Optional[Dict[str, Any]] = ...,
min_size: int = ...,
max_size: Optional[int] = ...,
*,
open: bool = True,
connection_class: Type[ACT] = cast(Type[ACT], AsyncConnection),
- configure: Optional[Callable[[ACT], Awaitable[None]]] = None,
- reset: Optional[Callable[[ACT], Awaitable[None]]] = None,
+ configure: Optional[AsyncConnectionCB[ACT]] = None,
+ reset: Optional[AsyncConnectionCB[ACT]] = None,
kwargs: Optional[Dict[str, Any]] = None,
# Note: default value changed to 0.
min_size: int = 0,
from time import monotonic
from queue import Queue, Empty
from types import TracebackType
-from typing import Any, Callable, cast, Dict, Generic, Iterator, List
+from typing import Any, cast, Dict, Generic, Iterator, List
from typing import Optional, overload, Sequence, Type, TypeVar
-from typing_extensions import TypeAlias
from weakref import ref
from contextlib import contextmanager
from psycopg.pq import TransactionStatus
from psycopg.rows import TupleRow
+from .abc import CT, ConnectionCB, ConnectFailedCB
from .base import ConnectionAttempt, BasePool
from .sched import Scheduler
from .errors import PoolClosed, PoolTimeout, TooManyRequests
logger = logging.getLogger("psycopg.pool")
-ConnectFailedCB: TypeAlias = Callable[["ConnectionPool"], None]
-
-CT = TypeVar("CT", bound="Connection[Any]")
-
class ConnectionPool(Generic[CT], BasePool):
_Self = TypeVar("_Self", bound="ConnectionPool[CT]")
conninfo: str = "",
*,
open: bool = ...,
- configure: Optional[Callable[[CT], None]] = ...,
- reset: Optional[Callable[[CT], None]] = ...,
+ configure: Optional[ConnectionCB[CT]] = ...,
+ reset: Optional[ConnectionCB[CT]] = ...,
kwargs: Optional[Dict[str, Any]] = ...,
min_size: int = ...,
max_size: Optional[int] = ...,
*,
open: bool = ...,
connection_class: Type[CT],
- configure: Optional[Callable[[CT], None]] = ...,
- reset: Optional[Callable[[CT], None]] = ...,
+ configure: Optional[ConnectionCB[CT]] = ...,
+ reset: Optional[ConnectionCB[CT]] = ...,
kwargs: Optional[Dict[str, Any]] = ...,
min_size: int = ...,
max_size: Optional[int] = ...,
*,
open: bool = True,
connection_class: Type[CT] = cast(Type[CT], Connection[TupleRow]),
- configure: Optional[Callable[[CT], None]] = None,
- reset: Optional[Callable[[CT], None]] = None,
+ configure: Optional[ConnectionCB[CT]] = None,
+ reset: Optional[ConnectionCB[CT]] = None,
kwargs: Optional[Dict[str, Any]] = None,
min_size: int = 4,
max_size: Optional[int] = None,
from abc import ABC, abstractmethod
from time import monotonic
from types import TracebackType
-from typing import Any, AsyncIterator, Awaitable, Callable, cast, Generic
-from typing import Dict, List, Optional, overload, Sequence, Type, TypeVar, Union
-from typing_extensions import TypeAlias
+from typing import Any, AsyncIterator, cast, Generic
+from typing import Dict, List, Optional, overload, Sequence, Type, TypeVar
from asyncio import create_task, Task
from weakref import ref
from contextlib import asynccontextmanager
from psycopg.pq import TransactionStatus
from psycopg.rows import TupleRow
+from .abc import ACT, AsyncConnectionCB, AsyncConnectFailedCB
from .base import ConnectionAttempt, BasePool
from .errors import PoolClosed, PoolTimeout, TooManyRequests
from ._compat import Deque
logger = logging.getLogger("psycopg.pool")
-AsyncConnectFailedCB: TypeAlias = Union[
- Callable[["AsyncConnectionPool"], None],
- Callable[["AsyncConnectionPool"], Awaitable[None]],
-]
-
-ACT = TypeVar("ACT", bound="AsyncConnection[Any]")
-
class AsyncConnectionPool(Generic[ACT], BasePool):
_Self = TypeVar("_Self", bound="AsyncConnectionPool[ACT]")
conninfo: str = "",
*,
open: bool = ...,
- configure: Optional[Callable[[ACT], Awaitable[None]]] = ...,
- reset: Optional[Callable[[ACT], Awaitable[None]]] = ...,
+ configure: Optional[AsyncConnectionCB[ACT]] = ...,
+ reset: Optional[AsyncConnectionCB[ACT]] = ...,
kwargs: Optional[Dict[str, Any]] = ...,
min_size: int = ...,
max_size: Optional[int] = ...,
*,
open: bool = ...,
connection_class: Type[ACT],
- configure: Optional[Callable[[ACT], Awaitable[None]]] = ...,
- reset: Optional[Callable[[ACT], Awaitable[None]]] = ...,
+ configure: Optional[AsyncConnectionCB[ACT]] = ...,
+ reset: Optional[AsyncConnectionCB[ACT]] = ...,
kwargs: Optional[Dict[str, Any]] = ...,
min_size: int = ...,
max_size: Optional[int] = ...,
*,
open: bool = True,
connection_class: Type[ACT] = cast(Type[ACT], AsyncConnection),
- configure: Optional[Callable[[ACT], Awaitable[None]]] = None,
- reset: Optional[Callable[[ACT], Awaitable[None]]] = None,
+ configure: Optional[AsyncConnectionCB[ACT]] = None,
+ reset: Optional[AsyncConnectionCB[ACT]] = None,
kwargs: Optional[Dict[str, Any]] = None,
min_size: int = 4,
max_size: Optional[int] = None,