import random
import logging
import threading
-from abc import ABC, abstractmethod
from queue import Queue, Empty
from typing import Any, Callable, Deque, Dict, Iterator, List, Optional
-from weakref import ref
from contextlib import contextmanager
from collections import deque
-from .. import errors as e
from ..pq import TransactionStatus
from ..connection import Connection
+from . import tasks
+from .base import ConnectionAttempt
from .sched import Scheduler
+from .errors import PoolClosed, PoolTimeout
logger = logging.getLogger(__name__)
WORKER_TIMEOUT = 60.0
-class PoolTimeout(e.OperationalError):
- pass
-
-
-class PoolClosed(e.OperationalError):
- pass
-
-
class ConnectionPool:
_num_pool = 0
# max_idle interval they weren't all used.
self._nconns_min = minconn
- self._tasks: "Queue[MaintenanceTask]" = Queue()
+ self._tasks: "Queue[tasks.MaintenanceTask]" = Queue()
self._workers: List[threading.Thread] = []
for i in range(num_workers):
t = threading.Thread(
if setup_timeout > 0:
event = threading.Event()
for i in range(self._nconns):
- self.run_task(AddInitialConnection(self, event))
+ self.run_task(tasks.AddInitialConnection(self, event))
# Wait for the pool to be full or throw an error
if not event.wait(timeout=setup_timeout):
)
else:
for i in range(self._nconns):
- self.run_task(AddConnection(self))
+ self.run_task(tasks.AddConnection(self))
# Schedule a task to shrink the pool if connections over minconn have
# remained unused. However if the pool cannot't grow don't bother.
if maxconn > minconn:
- self.schedule_task(ShrinkPool(self), self.max_idle)
+ self.schedule_task(tasks.ShrinkPool(self), self.max_idle)
def __repr__(self) -> str:
return (
logger.info(
"growing pool %r to %s", self.name, self._nconns
)
- self.run_task(AddConnection(self))
+ self.run_task(tasks.AddConnection(self))
# If we are in the waiting queue, wait to be assigned a connection
# (outside the critical section, so only the waiting client is locked)
return
# Use a worker to perform eventual maintenance work in a separate thread
- self.run_task(ReturnConnection(self, conn))
+ self.run_task(tasks.ReturnConnection(self, conn))
@property
def closed(self) -> bool:
# Stop the worker threads
for i in range(len(self._workers)):
- self.run_task(StopWorker(self))
+ self.run_task(tasks.StopWorker(self))
# Signal to eventual clients in the queue that business is closed.
for pos in waiting:
timeout,
)
- def run_task(self, task: "MaintenanceTask") -> None:
+ def run_task(self, task: tasks.MaintenanceTask) -> None:
"""Run a maintenance task in a worker thread."""
self._tasks.put(task)
- def schedule_task(self, task: "MaintenanceTask", delay: float) -> None:
+ def schedule_task(self, task: tasks.MaintenanceTask, delay: float) -> None:
"""Run a maintenance task in a worker thread in the future."""
self._sched.enter(delay, task.tick)
@classmethod
- def worker(cls, q: "Queue[MaintenanceTask]") -> None:
+ def worker(cls, q: "Queue[tasks.MaintenanceTask]") -> None:
"""Runner to execute pending maintenance task.
The function is designed to run as a separate thread.
"task run %s failed: %s: %s", task, e.__class__.__name__, e
)
- if isinstance(task, StopWorker):
+ if isinstance(task, tasks.StopWorker):
return
def configure(self, conn: Connection) -> None:
if trigger_event:
event.set()
- def _add_connection(self, attempt: Optional["ConnectionAttempt"]) -> None:
+ def _add_connection(self, attempt: Optional[ConnectionAttempt]) -> None:
"""Try to connect and add the connection to the pool.
If failed, reschedule a new attempt in the future for a few times, then
self.reconnect_failed()
else:
attempt.update_delay(now)
- self.schedule_task(AddConnection(self, attempt), attempt.delay)
+ self.schedule_task(
+ tasks.AddConnection(self, attempt), attempt.delay
+ )
else:
self._add_to_pool(conn)
if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
# Connection no more in working state: create a new one.
logger.warning("discarding closed connection: %s", conn)
- self.run_task(AddConnection(self))
+ self.run_task(tasks.AddConnection(self))
else:
self._add_to_pool(conn)
self.error = error
self._cond.notify_all()
return True
-
-
-class ConnectionAttempt:
- """Keep the state of a connection attempt."""
-
- INITIAL_DELAY = 1.0
- DELAY_JITTER = 0.1
- DELAY_BACKOFF = 2.0
-
- def __init__(self, *, reconnect_timeout: float):
- self.reconnect_timeout = reconnect_timeout
- self.delay = 0.0
- self.give_up_at = 0.0
-
- def update_delay(self, now: float) -> None:
- """Calculate how long to wait for a new connection attempt"""
- if self.delay == 0.0:
- self.give_up_at = now + self.reconnect_timeout
- # +/- 10% of the initial delay
- jitter = self.INITIAL_DELAY * (
- (2.0 * self.DELAY_JITTER * random.random()) - self.DELAY_JITTER
- )
- self.delay = self.INITIAL_DELAY + jitter
- else:
- self.delay *= self.DELAY_BACKOFF
-
- if self.delay + now > self.give_up_at:
- self.delay = max(0.0, self.give_up_at - now)
-
- def time_to_give_up(self, now: float) -> bool:
- """Return True if we are tired of trying to connect. Meh."""
- return self.give_up_at > 0.0 and now >= self.give_up_at
-
-
-class MaintenanceTask(ABC):
- """A task to run asynchronously to maintain the pool state."""
-
- def __init__(self, pool: ConnectionPool):
- self.pool = ref(pool)
- logger.debug("task created: %s", self)
-
- def __repr__(self) -> str:
- pool = self.pool()
- name = repr(pool.name) if pool else "<pool is gone>"
- return f"<{self.__class__.__name__} {name} at 0x{id(self):x}>"
-
- def run(self) -> None:
- """Run the task.
-
- This usually happens in a worker thread. Call the concrete _run()
- implementation, if the pool is still alive.
- """
- pool = self.pool()
- if not pool or pool.closed:
- # Pool is no more working. Quietly discard the operation.
- return
-
- logger.debug("task running: %s", self)
- self._run(pool)
-
- def tick(self) -> None:
- """Run the scheduled task
-
- This function is called by the scheduler thread. Use a worker to
- run the task for real in order to free the scheduler immediately.
- """
- pool = self.pool()
- if not pool or pool.closed:
- # Pool is no more working. Quietly discard the operation.
- return
-
- pool.run_task(self)
-
- @abstractmethod
- def _run(self, pool: ConnectionPool) -> None:
- ...
-
-
-class StopWorker(MaintenanceTask):
- """Signal the maintenance thread to terminate."""
-
- def _run(self, pool: ConnectionPool) -> None:
- pass
-
-
-class AddInitialConnection(MaintenanceTask):
- """Add a new connection into to the pool.
-
- If the desired number of connections is reached notify the event.
- """
-
- def __init__(self, pool: ConnectionPool, event: threading.Event):
- super().__init__(pool)
- self.event = event
-
- def _run(self, pool: ConnectionPool) -> None:
- pool._add_initial_connection(self.event)
-
-
-class AddConnection(MaintenanceTask):
- def __init__(
- self, pool: ConnectionPool, attempt: Optional[ConnectionAttempt] = None
- ):
- super().__init__(pool)
- self.attempt = attempt
-
- def _run(self, pool: ConnectionPool) -> None:
- pool._add_connection(self.attempt)
-
-
-class ReturnConnection(MaintenanceTask):
- """Clean up and return a connection to the pool."""
-
- def __init__(self, pool: ConnectionPool, conn: Connection):
- super().__init__(pool)
- self.conn = conn
-
- def _run(self, pool: ConnectionPool) -> None:
- pool._return_connection(self.conn)
-
-
-class ShrinkPool(MaintenanceTask):
- """If the pool can shrink, remove one connection.
-
- Re-schedule periodically and also reset the minimum number of connections
- in the pool.
- """
-
- def _run(self, pool: ConnectionPool) -> None:
- # Reschedule the task now so that in case of any error we don't lose
- # the periodic run.
- pool.schedule_task(self, pool.max_idle)
-
- pool._shrink_if_possible()
--- /dev/null
+"""
+Maintenance tasks for the connection pools.
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+import logging
+import threading
+from abc import ABC, abstractmethod
+from typing import Optional, TYPE_CHECKING
+from weakref import ref
+
+if TYPE_CHECKING:
+ from .base import ConnectionAttempt
+ from .pool import ConnectionPool
+ from ..connection import Connection
+
+logger = logging.getLogger(__name__)
+
+
+class MaintenanceTask(ABC):
+ """A task to run asynchronously to maintain the pool state."""
+
+ def __init__(self, pool: "ConnectionPool"):
+ self.pool = ref(pool)
+ logger.debug("task created: %s", self)
+
+ def __repr__(self) -> str:
+ pool = self.pool()
+ name = repr(pool.name) if pool else "<pool is gone>"
+ return f"<{self.__class__.__name__} {name} at 0x{id(self):x}>"
+
+ def run(self) -> None:
+ """Run the task.
+
+ This usually happens in a worker thread. Call the concrete _run()
+ implementation, if the pool is still alive.
+ """
+ pool = self.pool()
+ if not pool or pool.closed:
+ # Pool is no more working. Quietly discard the operation.
+ return
+
+ logger.debug("task running: %s", self)
+ self._run(pool)
+
+ def tick(self) -> None:
+ """Run the scheduled task
+
+ This function is called by the scheduler thread. Use a worker to
+ run the task for real in order to free the scheduler immediately.
+ """
+ pool = self.pool()
+ if not pool or pool.closed:
+ # Pool is no more working. Quietly discard the operation.
+ return
+
+ pool.run_task(self)
+
+ @abstractmethod
+ def _run(self, pool: "ConnectionPool") -> None:
+ ...
+
+
+class StopWorker(MaintenanceTask):
+ """Signal the maintenance thread to terminate."""
+
+ def _run(self, pool: "ConnectionPool") -> None:
+ pass
+
+
+class AddInitialConnection(MaintenanceTask):
+ """Add a new connection into to the pool.
+
+ If the desired number of connections is reached notify the event.
+ """
+
+ def __init__(self, pool: "ConnectionPool", event: threading.Event):
+ super().__init__(pool)
+ self.event = event
+
+ def _run(self, pool: "ConnectionPool") -> None:
+ pool._add_initial_connection(self.event)
+
+
+class AddConnection(MaintenanceTask):
+ def __init__(
+ self,
+ pool: "ConnectionPool",
+ attempt: Optional["ConnectionAttempt"] = None,
+ ):
+ super().__init__(pool)
+ self.attempt = attempt
+
+ def _run(self, pool: "ConnectionPool") -> None:
+ pool._add_connection(self.attempt)
+
+
+class ReturnConnection(MaintenanceTask):
+ """Clean up and return a connection to the pool."""
+
+ def __init__(self, pool: "ConnectionPool", conn: "Connection"):
+ super().__init__(pool)
+ self.conn = conn
+
+ def _run(self, pool: "ConnectionPool") -> None:
+ pool._return_connection(self.conn)
+
+
+class ShrinkPool(MaintenanceTask):
+ """If the pool can shrink, remove one connection.
+
+ Re-schedule periodically and also reset the minimum number of connections
+ in the pool.
+ """
+
+ def _run(self, pool: "ConnectionPool") -> None:
+ # Reschedule the task now so that in case of any error we don't lose
+ # the periodic run.
+ pool.schedule_task(self, pool.max_idle)
+
+ pool._shrink_if_possible()