# Copyright (C) 2022 The Psycopg Team
+from __future__ import annotations
+
import logging
from typing import Any, cast, Dict, Optional, overload, Tuple, Type
class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool[CT]):
@overload
def __init__(
- self: "NullConnectionPool[Connection[TupleRow]]",
+ self: NullConnectionPool[Connection[TupleRow]],
conninfo: str = "",
*,
open: bool = ...,
@overload
def __init__(
- self: "NullConnectionPool[CT]",
+ self: NullConnectionPool[CT],
conninfo: str = "",
*,
open: bool = ...,
# Copyright (C) 2022 The Psycopg Team
+from __future__ import annotations
+
import logging
from typing import Any, cast, Dict, Optional, overload, Type
class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool[ACT]):
@overload
def __init__(
- self: "AsyncNullConnectionPool[AsyncConnection[TupleRow]]",
+ self: AsyncNullConnectionPool[AsyncConnection[TupleRow]],
conninfo: str = "",
*,
open: bool = ...,
@overload
def __init__(
- self: "AsyncNullConnectionPool[ACT]",
+ self: AsyncNullConnectionPool[ACT],
conninfo: str = "",
*,
open: bool = ...,
# Copyright (C) 2021 The Psycopg Team
+from __future__ import annotations
+
import logging
import threading
from abc import ABC, abstractmethod
@overload
def __init__(
- self: "ConnectionPool[Connection[TupleRow]]",
+ self: ConnectionPool[Connection[TupleRow]],
conninfo: str = "",
*,
open: bool = ...,
@overload
def __init__(
- self: "ConnectionPool[CT]",
+ self: ConnectionPool[CT],
conninfo: str = "",
*,
open: bool = ...,
self._reconnect_failed = reconnect_failed
self._lock = Lock()
- self._waiting = Deque["WaitingClient[CT]"]()
+ self._waiting = Deque[WaitingClient[CT]]()
# to notify that the pool is full
self._pool_full_event: Optional[Event] = None
self._sched = Scheduler()
self._sched_runner: Optional[threading.Thread] = None
- self._tasks: "Queue[MaintenanceTask]" = Queue()
+ self._tasks: Queue[MaintenanceTask] = Queue()
self._workers: List[threading.Thread] = []
super().__init__(
def _stop_workers(
self,
- waiting_clients: Sequence["WaitingClient[CT]"] = (),
+ waiting_clients: Sequence[WaitingClient[CT]] = (),
connections: Sequence[CT] = (),
timeout: float | None = None,
) -> None:
self._reconnect_failed(self)
- def run_task(self, task: "MaintenanceTask") -> None:
+ def run_task(self, task: MaintenanceTask) -> None:
"""Run a maintenance task in a worker thread."""
self._tasks.put_nowait(task)
- def schedule_task(self, task: "MaintenanceTask", delay: float) -> None:
+ def schedule_task(self, task: MaintenanceTask, delay: float) -> None:
"""Run a maintenance task in a worker thread in the future."""
self._sched.enter(delay, task.tick)
@classmethod
- def worker(cls, q: "Queue[MaintenanceTask]") -> None:
+ def worker(cls, q: Queue[MaintenanceTask]) -> None:
"""Runner to execute pending maintenance task.
The function is designed to run as a separate thread.
class MaintenanceTask(ABC):
"""A task to run asynchronously to maintain the pool state."""
- def __init__(self, pool: "ConnectionPool[Any]"):
+ def __init__(self, pool: ConnectionPool[Any]):
self.pool = ref(pool)
def __repr__(self) -> str:
pool.run_task(self)
@abstractmethod
- def _run(self, pool: "ConnectionPool[Any]") -> None:
+ def _run(self, pool: ConnectionPool[Any]) -> None:
...
class StopWorker(MaintenanceTask):
"""Signal the maintenance thread to terminate."""
- def _run(self, pool: "ConnectionPool[Any]") -> None:
+ def _run(self, pool: ConnectionPool[Any]) -> None:
pass
class AddConnection(MaintenanceTask):
def __init__(
self,
- pool: "ConnectionPool[Any]",
- attempt: Optional["ConnectionAttempt"] = None,
+ pool: ConnectionPool[Any],
+ attempt: Optional[ConnectionAttempt] = None,
growing: bool = False,
):
super().__init__(pool)
self.attempt = attempt
self.growing = growing
- def _run(self, pool: "ConnectionPool[Any]") -> None:
+ def _run(self, pool: ConnectionPool[Any]) -> None:
pool._add_connection(self.attempt, growing=self.growing)
class ReturnConnection(MaintenanceTask):
"""Clean up and return a connection to the pool."""
- def __init__(self, pool: "ConnectionPool[Any]", conn: CT):
+ def __init__(self, pool: ConnectionPool[Any], conn: CT):
super().__init__(pool)
self.conn = conn
- def _run(self, pool: "ConnectionPool[Any]") -> None:
+ def _run(self, pool: ConnectionPool[Any]) -> None:
pool._return_connection(self.conn)
in the pool.
"""
- def _run(self, pool: "ConnectionPool[Any]") -> None:
+ def _run(self, pool: ConnectionPool[Any]) -> None:
# Reschedule the task now so that in case of any error we don't lose
# the periodic run.
pool.schedule_task(self, pool.max_idle)
# Copyright (C) 2021 The Psycopg Team
+from __future__ import annotations
+
import asyncio
import logging
from abc import ABC, abstractmethod
@overload
def __init__(
- self: "AsyncConnectionPool[AsyncConnection[TupleRow]]",
+ self: AsyncConnectionPool[AsyncConnection[TupleRow]],
conninfo: str = "",
*,
open: bool = ...,
@overload
def __init__(
- self: "AsyncConnectionPool[ACT]",
+ self: AsyncConnectionPool[ACT],
conninfo: str = "",
*,
open: bool = ...,
# asyncio objects, created on open to attach them to the right loop.
self._lock: ALock
self._sched: AsyncScheduler
- self._tasks: AQueue["MaintenanceTask"]
+ self._tasks: AQueue[MaintenanceTask]
- self._waiting = Deque["AsyncClient[ACT]"]()
+ self._waiting = Deque[AsyncClient[ACT]]()
# to notify that the pool is full
self._pool_full_event: Optional[AEvent] = None
async def _stop_workers(
self,
- waiting_clients: Sequence["AsyncClient[ACT]"] = (),
+ waiting_clients: Sequence[AsyncClient[ACT]] = (),
connections: Sequence[ACT] = (),
timeout: float | None = None,
) -> None:
else:
self._reconnect_failed(self)
- def run_task(self, task: "MaintenanceTask") -> None:
+ def run_task(self, task: MaintenanceTask) -> None:
"""Run a maintenance task in a worker."""
self._tasks.put_nowait(task)
- async def schedule_task(self, task: "MaintenanceTask", delay: float) -> None:
+ async def schedule_task(self, task: MaintenanceTask, delay: float) -> None:
"""Run a maintenance task in a worker in the future."""
await self._sched.enter(delay, task.tick)
@classmethod
- async def worker(cls, q: AQueue["MaintenanceTask"]) -> None:
+ async def worker(cls, q: AQueue[MaintenanceTask]) -> None:
"""Runner to execute pending maintenance task.
The function is designed to run as a task.
class MaintenanceTask(ABC):
"""A task to run asynchronously to maintain the pool state."""
- def __init__(self, pool: "AsyncConnectionPool[Any]"):
+ def __init__(self, pool: AsyncConnectionPool[Any]):
self.pool = ref(pool)
def __repr__(self) -> str:
pool.run_task(self)
@abstractmethod
- async def _run(self, pool: "AsyncConnectionPool[Any]") -> None:
+ async def _run(self, pool: AsyncConnectionPool[Any]) -> None:
...
class StopWorker(MaintenanceTask):
"""Signal the maintenance worker to terminate."""
- async def _run(self, pool: "AsyncConnectionPool[Any]") -> None:
+ async def _run(self, pool: AsyncConnectionPool[Any]) -> None:
pass
class AddConnection(MaintenanceTask):
def __init__(
self,
- pool: "AsyncConnectionPool[Any]",
- attempt: Optional["ConnectionAttempt"] = None,
+ pool: AsyncConnectionPool[Any],
+ attempt: Optional[ConnectionAttempt] = None,
growing: bool = False,
):
super().__init__(pool)
self.attempt = attempt
self.growing = growing
- async def _run(self, pool: "AsyncConnectionPool[Any]") -> None:
+ async def _run(self, pool: AsyncConnectionPool[Any]) -> None:
await pool._add_connection(self.attempt, growing=self.growing)
class ReturnConnection(MaintenanceTask):
"""Clean up and return a connection to the pool."""
- def __init__(self, pool: "AsyncConnectionPool[Any]", conn: ACT):
+ def __init__(self, pool: AsyncConnectionPool[Any], conn: ACT):
super().__init__(pool)
self.conn = conn
- async def _run(self, pool: "AsyncConnectionPool[Any]") -> None:
+ async def _run(self, pool: AsyncConnectionPool[Any]) -> None:
await pool._return_connection(self.conn)
in the pool.
"""
- async def _run(self, pool: "AsyncConnectionPool[Any]") -> None:
+ async def _run(self, pool: AsyncConnectionPool[Any]) -> None:
# Reschedule the task now so that in case of any error we don't lose
# the periodic run.
await pool.schedule_task(self, pool.max_idle)
"""
def __init__(
- self,
- pool: "AsyncConnectionPool[Any]",
- task: MaintenanceTask,
- delay: float,
+ self, pool: AsyncConnectionPool[Any], task: MaintenanceTask, delay: float
):
super().__init__(pool)
self.task = task
self.delay = delay
- async def _run(self, pool: "AsyncConnectionPool[Any]") -> None:
+ async def _run(self, pool: AsyncConnectionPool[Any]) -> None:
await pool.schedule_task(self.task, self.delay)