From: Daniele Varrazzo Date: Thu, 25 Feb 2021 19:36:48 +0000 (+0100) Subject: Split the pool package objects in modules X-Git-Tag: 3.0.dev0~87^2~45 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1cf1bea9db8acad14be79cdfed70699ef82e0a15;p=thirdparty%2Fpsycopg.git Split the pool package objects in modules --- diff --git a/psycopg3/psycopg3/pool/__init__.py b/psycopg3/psycopg3/pool/__init__.py index 222d89d8b..327dcfc86 100644 --- a/psycopg3/psycopg3/pool/__init__.py +++ b/psycopg3/psycopg3/pool/__init__.py @@ -4,6 +4,7 @@ psycopg3 connection pool package # Copyright (C) 2021 The Psycopg Team -from .pool import ConnectionPool, PoolClosed, PoolTimeout +from .pool import ConnectionPool +from .errors import PoolClosed, PoolTimeout __all__ = ["ConnectionPool", "PoolClosed", "PoolTimeout"] diff --git a/psycopg3/psycopg3/pool/base.py b/psycopg3/psycopg3/pool/base.py new file mode 100644 index 000000000..ad76dabc9 --- /dev/null +++ b/psycopg3/psycopg3/pool/base.py @@ -0,0 +1,39 @@ +""" +psycopg3 connection pool base class and functionalities. +""" + +# Copyright (C) 2021 The Psycopg Team + +import random + + +class ConnectionAttempt: + """Keep the state of a connection attempt.""" + + INITIAL_DELAY = 1.0 + DELAY_JITTER = 0.1 + DELAY_BACKOFF = 2.0 + + def __init__(self, *, reconnect_timeout: float): + self.reconnect_timeout = reconnect_timeout + self.delay = 0.0 + self.give_up_at = 0.0 + + def update_delay(self, now: float) -> None: + """Calculate how long to wait for a new connection attempt""" + if self.delay == 0.0: + self.give_up_at = now + self.reconnect_timeout + # +/- 10% of the initial delay + jitter = self.INITIAL_DELAY * ( + (2.0 * self.DELAY_JITTER * random.random()) - self.DELAY_JITTER + ) + self.delay = self.INITIAL_DELAY + jitter + else: + self.delay *= self.DELAY_BACKOFF + + if self.delay + now > self.give_up_at: + self.delay = max(0.0, self.give_up_at - now) + + def time_to_give_up(self, now: float) -> bool: + """Return True if we are tired of trying to connect. Meh.""" + return self.give_up_at > 0.0 and now >= self.give_up_at diff --git a/psycopg3/psycopg3/pool/errors.py b/psycopg3/psycopg3/pool/errors.py new file mode 100644 index 000000000..12f8fa64a --- /dev/null +++ b/psycopg3/psycopg3/pool/errors.py @@ -0,0 +1,19 @@ +""" +Connection pool errors. +""" + +# Copyright (C) 2021 The Psycopg Team + +from .. import errors as e + + +class PoolClosed(e.OperationalError): + """Attempt to get a connection from a closed pool.""" + + __module__ = "psycopg3.pool" + + +class PoolTimeout(e.OperationalError): + """The pool couldn't provide a connection in acceptable time.""" + + __module__ = "psycopg3.pool" diff --git a/psycopg3/psycopg3/pool/pool.py b/psycopg3/psycopg3/pool/pool.py index c544c0662..b6718f9f2 100644 --- a/psycopg3/psycopg3/pool/pool.py +++ b/psycopg3/psycopg3/pool/pool.py @@ -8,32 +8,24 @@ import time import random import logging import threading -from abc import ABC, abstractmethod from queue import Queue, Empty from typing import Any, Callable, Deque, Dict, Iterator, List, Optional -from weakref import ref from contextlib import contextmanager from collections import deque -from .. import errors as e from ..pq import TransactionStatus from ..connection import Connection +from . import tasks +from .base import ConnectionAttempt from .sched import Scheduler +from .errors import PoolClosed, PoolTimeout logger = logging.getLogger(__name__) WORKER_TIMEOUT = 60.0 -class PoolTimeout(e.OperationalError): - pass - - -class PoolClosed(e.OperationalError): - pass - - class ConnectionPool: _num_pool = 0 @@ -95,7 +87,7 @@ class ConnectionPool: # max_idle interval they weren't all used. self._nconns_min = minconn - self._tasks: "Queue[MaintenanceTask]" = Queue() + self._tasks: "Queue[tasks.MaintenanceTask]" = Queue() self._workers: List[threading.Thread] = [] for i in range(num_workers): t = threading.Thread( @@ -121,7 +113,7 @@ class ConnectionPool: if setup_timeout > 0: event = threading.Event() for i in range(self._nconns): - self.run_task(AddInitialConnection(self, event)) + self.run_task(tasks.AddInitialConnection(self, event)) # Wait for the pool to be full or throw an error if not event.wait(timeout=setup_timeout): @@ -131,12 +123,12 @@ class ConnectionPool: ) else: for i in range(self._nconns): - self.run_task(AddConnection(self)) + self.run_task(tasks.AddConnection(self)) # Schedule a task to shrink the pool if connections over minconn have # remained unused. However if the pool cannot't grow don't bother. if maxconn > minconn: - self.schedule_task(ShrinkPool(self), self.max_idle) + self.schedule_task(tasks.ShrinkPool(self), self.max_idle) def __repr__(self) -> str: return ( @@ -206,7 +198,7 @@ class ConnectionPool: logger.info( "growing pool %r to %s", self.name, self._nconns ) - self.run_task(AddConnection(self)) + self.run_task(tasks.AddConnection(self)) # If we are in the waiting queue, wait to be assigned a connection # (outside the critical section, so only the waiting client is locked) @@ -249,7 +241,7 @@ class ConnectionPool: return # Use a worker to perform eventual maintenance work in a separate thread - self.run_task(ReturnConnection(self, conn)) + self.run_task(tasks.ReturnConnection(self, conn)) @property def closed(self) -> bool: @@ -285,7 +277,7 @@ class ConnectionPool: # Stop the worker threads for i in range(len(self._workers)): - self.run_task(StopWorker(self)) + self.run_task(tasks.StopWorker(self)) # Signal to eventual clients in the queue that business is closed. for pos in waiting: @@ -309,16 +301,16 @@ class ConnectionPool: timeout, ) - def run_task(self, task: "MaintenanceTask") -> None: + def run_task(self, task: tasks.MaintenanceTask) -> None: """Run a maintenance task in a worker thread.""" self._tasks.put(task) - def schedule_task(self, task: "MaintenanceTask", delay: float) -> None: + def schedule_task(self, task: tasks.MaintenanceTask, delay: float) -> None: """Run a maintenance task in a worker thread in the future.""" self._sched.enter(delay, task.tick) @classmethod - def worker(cls, q: "Queue[MaintenanceTask]") -> None: + def worker(cls, q: "Queue[tasks.MaintenanceTask]") -> None: """Runner to execute pending maintenance task. The function is designed to run as a separate thread. @@ -343,7 +335,7 @@ class ConnectionPool: "task run %s failed: %s: %s", task, e.__class__.__name__, e ) - if isinstance(task, StopWorker): + if isinstance(task, tasks.StopWorker): return def configure(self, conn: Connection) -> None: @@ -381,7 +373,7 @@ class ConnectionPool: if trigger_event: event.set() - def _add_connection(self, attempt: Optional["ConnectionAttempt"]) -> None: + def _add_connection(self, attempt: Optional[ConnectionAttempt]) -> None: """Try to connect and add the connection to the pool. If failed, reschedule a new attempt in the future for a few times, then @@ -410,7 +402,9 @@ class ConnectionPool: self.reconnect_failed() else: attempt.update_delay(now) - self.schedule_task(AddConnection(self, attempt), attempt.delay) + self.schedule_task( + tasks.AddConnection(self, attempt), attempt.delay + ) else: self._add_to_pool(conn) @@ -422,7 +416,7 @@ class ConnectionPool: if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: # Connection no more in working state: create a new one. logger.warning("discarding closed connection: %s", conn) - self.run_task(AddConnection(self)) + self.run_task(tasks.AddConnection(self)) else: self._add_to_pool(conn) @@ -569,137 +563,3 @@ class WaitingClient: self.error = error self._cond.notify_all() return True - - -class ConnectionAttempt: - """Keep the state of a connection attempt.""" - - INITIAL_DELAY = 1.0 - DELAY_JITTER = 0.1 - DELAY_BACKOFF = 2.0 - - def __init__(self, *, reconnect_timeout: float): - self.reconnect_timeout = reconnect_timeout - self.delay = 0.0 - self.give_up_at = 0.0 - - def update_delay(self, now: float) -> None: - """Calculate how long to wait for a new connection attempt""" - if self.delay == 0.0: - self.give_up_at = now + self.reconnect_timeout - # +/- 10% of the initial delay - jitter = self.INITIAL_DELAY * ( - (2.0 * self.DELAY_JITTER * random.random()) - self.DELAY_JITTER - ) - self.delay = self.INITIAL_DELAY + jitter - else: - self.delay *= self.DELAY_BACKOFF - - if self.delay + now > self.give_up_at: - self.delay = max(0.0, self.give_up_at - now) - - def time_to_give_up(self, now: float) -> bool: - """Return True if we are tired of trying to connect. Meh.""" - return self.give_up_at > 0.0 and now >= self.give_up_at - - -class MaintenanceTask(ABC): - """A task to run asynchronously to maintain the pool state.""" - - def __init__(self, pool: ConnectionPool): - self.pool = ref(pool) - logger.debug("task created: %s", self) - - def __repr__(self) -> str: - pool = self.pool() - name = repr(pool.name) if pool else "" - return f"<{self.__class__.__name__} {name} at 0x{id(self):x}>" - - def run(self) -> None: - """Run the task. - - This usually happens in a worker thread. Call the concrete _run() - implementation, if the pool is still alive. - """ - pool = self.pool() - if not pool or pool.closed: - # Pool is no more working. Quietly discard the operation. - return - - logger.debug("task running: %s", self) - self._run(pool) - - def tick(self) -> None: - """Run the scheduled task - - This function is called by the scheduler thread. Use a worker to - run the task for real in order to free the scheduler immediately. - """ - pool = self.pool() - if not pool or pool.closed: - # Pool is no more working. Quietly discard the operation. - return - - pool.run_task(self) - - @abstractmethod - def _run(self, pool: ConnectionPool) -> None: - ... - - -class StopWorker(MaintenanceTask): - """Signal the maintenance thread to terminate.""" - - def _run(self, pool: ConnectionPool) -> None: - pass - - -class AddInitialConnection(MaintenanceTask): - """Add a new connection into to the pool. - - If the desired number of connections is reached notify the event. - """ - - def __init__(self, pool: ConnectionPool, event: threading.Event): - super().__init__(pool) - self.event = event - - def _run(self, pool: ConnectionPool) -> None: - pool._add_initial_connection(self.event) - - -class AddConnection(MaintenanceTask): - def __init__( - self, pool: ConnectionPool, attempt: Optional[ConnectionAttempt] = None - ): - super().__init__(pool) - self.attempt = attempt - - def _run(self, pool: ConnectionPool) -> None: - pool._add_connection(self.attempt) - - -class ReturnConnection(MaintenanceTask): - """Clean up and return a connection to the pool.""" - - def __init__(self, pool: ConnectionPool, conn: Connection): - super().__init__(pool) - self.conn = conn - - def _run(self, pool: ConnectionPool) -> None: - pool._return_connection(self.conn) - - -class ShrinkPool(MaintenanceTask): - """If the pool can shrink, remove one connection. - - Re-schedule periodically and also reset the minimum number of connections - in the pool. - """ - - def _run(self, pool: ConnectionPool) -> None: - # Reschedule the task now so that in case of any error we don't lose - # the periodic run. - pool.schedule_task(self, pool.max_idle) - - pool._shrink_if_possible() diff --git a/psycopg3/psycopg3/pool/tasks.py b/psycopg3/psycopg3/pool/tasks.py new file mode 100644 index 000000000..fe1a11ba4 --- /dev/null +++ b/psycopg3/psycopg3/pool/tasks.py @@ -0,0 +1,122 @@ +""" +Maintenance tasks for the connection pools. +""" + +# Copyright (C) 2021 The Psycopg Team + +import logging +import threading +from abc import ABC, abstractmethod +from typing import Optional, TYPE_CHECKING +from weakref import ref + +if TYPE_CHECKING: + from .base import ConnectionAttempt + from .pool import ConnectionPool + from ..connection import Connection + +logger = logging.getLogger(__name__) + + +class MaintenanceTask(ABC): + """A task to run asynchronously to maintain the pool state.""" + + def __init__(self, pool: "ConnectionPool"): + self.pool = ref(pool) + logger.debug("task created: %s", self) + + def __repr__(self) -> str: + pool = self.pool() + name = repr(pool.name) if pool else "" + return f"<{self.__class__.__name__} {name} at 0x{id(self):x}>" + + def run(self) -> None: + """Run the task. + + This usually happens in a worker thread. Call the concrete _run() + implementation, if the pool is still alive. + """ + pool = self.pool() + if not pool or pool.closed: + # Pool is no more working. Quietly discard the operation. + return + + logger.debug("task running: %s", self) + self._run(pool) + + def tick(self) -> None: + """Run the scheduled task + + This function is called by the scheduler thread. Use a worker to + run the task for real in order to free the scheduler immediately. + """ + pool = self.pool() + if not pool or pool.closed: + # Pool is no more working. Quietly discard the operation. + return + + pool.run_task(self) + + @abstractmethod + def _run(self, pool: "ConnectionPool") -> None: + ... + + +class StopWorker(MaintenanceTask): + """Signal the maintenance thread to terminate.""" + + def _run(self, pool: "ConnectionPool") -> None: + pass + + +class AddInitialConnection(MaintenanceTask): + """Add a new connection into to the pool. + + If the desired number of connections is reached notify the event. + """ + + def __init__(self, pool: "ConnectionPool", event: threading.Event): + super().__init__(pool) + self.event = event + + def _run(self, pool: "ConnectionPool") -> None: + pool._add_initial_connection(self.event) + + +class AddConnection(MaintenanceTask): + def __init__( + self, + pool: "ConnectionPool", + attempt: Optional["ConnectionAttempt"] = None, + ): + super().__init__(pool) + self.attempt = attempt + + def _run(self, pool: "ConnectionPool") -> None: + pool._add_connection(self.attempt) + + +class ReturnConnection(MaintenanceTask): + """Clean up and return a connection to the pool.""" + + def __init__(self, pool: "ConnectionPool", conn: "Connection"): + super().__init__(pool) + self.conn = conn + + def _run(self, pool: "ConnectionPool") -> None: + pool._return_connection(self.conn) + + +class ShrinkPool(MaintenanceTask): + """If the pool can shrink, remove one connection. + + Re-schedule periodically and also reset the minimum number of connections + in the pool. + """ + + def _run(self, pool: "ConnectionPool") -> None: + # Reschedule the task now so that in case of any error we don't lose + # the periodic run. + pool.schedule_task(self, pool.max_idle) + + pool._shrink_if_possible() diff --git a/tests/pool/test_pool.py b/tests/pool/test_pool.py index a80c8fb1d..93c4cb37c 100644 --- a/tests/pool/test_pool.py +++ b/tests/pool/test_pool.py @@ -486,7 +486,9 @@ def test_grow(dsn, monkeypatch): @pytest.mark.slow def test_shrink(dsn, monkeypatch): - orig_run = pool.pool.ShrinkPool._run + from psycopg3.pool.tasks import ShrinkPool + + orig_run = ShrinkPool._run results = [] def run_hacked(self, pool): @@ -495,7 +497,7 @@ def test_shrink(dsn, monkeypatch): n1 = pool._nconns results.append((n0, n1)) - monkeypatch.setattr(pool.pool.ShrinkPool, "_run", run_hacked) + monkeypatch.setattr(ShrinkPool, "_run", run_hacked) p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, max_idle=0.2) assert p.max_idle == 0.2