import random
import logging
import threading
+from abc import ABC, abstractmethod
from queue import Queue, Empty
from typing import Any, Callable, Deque, Dict, Iterator, List, Optional
from contextlib import contextmanager
# Run the task. Make sure don't die in the attempt.
try:
- task()
+ task.run()
except Exception as e:
logger.warning(
"task run %s failed: %s: %s", task, e.__class__.__name__, e
self.event.set()
-class MaintenanceTask:
+class MaintenanceTask(ABC):
+ """A task run asynchronously to maintain the pool state."""
+
def __init__(self, pool: ConnectionPool):
self.pool = pool
logger.debug("task created: %s", self)
- def __call__(self) -> None:
- logger.debug("task running: %s", self)
-
def __repr__(self) -> str:
return (
f"<{self.__class__.__name__} {self.pool.name!r} at 0x{id(self):x}>"
)
+ def run(self) -> None:
+ logger.debug("task running: %s", self)
+ self._run()
+
+ @abstractmethod
+ def _run(self) -> None:
+ ...
+
class StopWorker(MaintenanceTask):
"""Signal the maintenance thread to terminate."""
+ def _run(self) -> None:
+ pass
+
class TopUpConnections(MaintenanceTask):
"""Increase the number of connections in the pool to the desired number."""
- def __call__(self) -> None:
- super().__call__()
-
+ def _run(self) -> None:
with self.pool._lock:
# Check if there are new connections to create. If there are
# update the number of connections managed immediately and in
class AddConnection(MaintenanceTask):
"""Add a new connection into to the pool."""
- def __call__(self) -> None:
- super().__call__()
-
+ def _run(self) -> None:
conn = self.pool._connect()
conn._pool = self.pool # make it accepted by putconn
self.pool.putconn(conn)
super().__init__(pool)
self.conn = conn
- def __call__(self) -> None:
- super().__call__()
+ def _run(self) -> None:
self.pool._return_connection(self.conn)