]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Split the pool package objects in modules
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Thu, 25 Feb 2021 19:36:48 +0000 (20:36 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 12 Mar 2021 04:07:25 +0000 (05:07 +0100)
psycopg3/psycopg3/pool/__init__.py
psycopg3/psycopg3/pool/base.py [new file with mode: 0644]
psycopg3/psycopg3/pool/errors.py [new file with mode: 0644]
psycopg3/psycopg3/pool/pool.py
psycopg3/psycopg3/pool/tasks.py [new file with mode: 0644]
tests/pool/test_pool.py

index 222d89d8b615f98eb6d414406dfbe45bfc8ff829..327dcfc86c507d4af072e8933f55aa9c5cfb8b44 100644 (file)
@@ -4,6 +4,7 @@ psycopg3 connection pool package
 
 # Copyright (C) 2021 The Psycopg Team
 
-from .pool import ConnectionPool, PoolClosed, PoolTimeout
+from .pool import ConnectionPool
+from .errors import PoolClosed, PoolTimeout
 
 __all__ = ["ConnectionPool", "PoolClosed", "PoolTimeout"]
diff --git a/psycopg3/psycopg3/pool/base.py b/psycopg3/psycopg3/pool/base.py
new file mode 100644 (file)
index 0000000..ad76dab
--- /dev/null
@@ -0,0 +1,39 @@
+"""
+psycopg3 connection pool base class and functionalities.
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+import random
+
+
+class ConnectionAttempt:
+    """Keep the state of a connection attempt."""
+
+    INITIAL_DELAY = 1.0
+    DELAY_JITTER = 0.1
+    DELAY_BACKOFF = 2.0
+
+    def __init__(self, *, reconnect_timeout: float):
+        self.reconnect_timeout = reconnect_timeout
+        self.delay = 0.0
+        self.give_up_at = 0.0
+
+    def update_delay(self, now: float) -> None:
+        """Calculate how long to wait for a new connection attempt"""
+        if self.delay == 0.0:
+            self.give_up_at = now + self.reconnect_timeout
+            # +/- 10% of the initial delay
+            jitter = self.INITIAL_DELAY * (
+                (2.0 * self.DELAY_JITTER * random.random()) - self.DELAY_JITTER
+            )
+            self.delay = self.INITIAL_DELAY + jitter
+        else:
+            self.delay *= self.DELAY_BACKOFF
+
+        if self.delay + now > self.give_up_at:
+            self.delay = max(0.0, self.give_up_at - now)
+
+    def time_to_give_up(self, now: float) -> bool:
+        """Return True if we are tired of trying to connect. Meh."""
+        return self.give_up_at > 0.0 and now >= self.give_up_at
diff --git a/psycopg3/psycopg3/pool/errors.py b/psycopg3/psycopg3/pool/errors.py
new file mode 100644 (file)
index 0000000..12f8fa6
--- /dev/null
@@ -0,0 +1,19 @@
+"""
+Connection pool errors.
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+from .. import errors as e
+
+
+class PoolClosed(e.OperationalError):
+    """Attempt to get a connection from a closed pool."""
+
+    __module__ = "psycopg3.pool"
+
+
+class PoolTimeout(e.OperationalError):
+    """The pool couldn't provide a connection in acceptable time."""
+
+    __module__ = "psycopg3.pool"
index c544c0662875ab7ec897fb854c60a11b684f9b94..b6718f9f26e2955d9ff4514fcf8b19229bc68449 100644 (file)
@@ -8,32 +8,24 @@ import time
 import random
 import logging
 import threading
-from abc import ABC, abstractmethod
 from queue import Queue, Empty
 from typing import Any, Callable, Deque, Dict, Iterator, List, Optional
-from weakref import ref
 from contextlib import contextmanager
 from collections import deque
 
-from .. import errors as e
 from ..pq import TransactionStatus
 from ..connection import Connection
 
+from . import tasks
+from .base import ConnectionAttempt
 from .sched import Scheduler
+from .errors import PoolClosed, PoolTimeout
 
 logger = logging.getLogger(__name__)
 
 WORKER_TIMEOUT = 60.0
 
 
-class PoolTimeout(e.OperationalError):
-    pass
-
-
-class PoolClosed(e.OperationalError):
-    pass
-
-
 class ConnectionPool:
 
     _num_pool = 0
@@ -95,7 +87,7 @@ class ConnectionPool:
         # max_idle interval they weren't all used.
         self._nconns_min = minconn
 
-        self._tasks: "Queue[MaintenanceTask]" = Queue()
+        self._tasks: "Queue[tasks.MaintenanceTask]" = Queue()
         self._workers: List[threading.Thread] = []
         for i in range(num_workers):
             t = threading.Thread(
@@ -121,7 +113,7 @@ class ConnectionPool:
         if setup_timeout > 0:
             event = threading.Event()
             for i in range(self._nconns):
-                self.run_task(AddInitialConnection(self, event))
+                self.run_task(tasks.AddInitialConnection(self, event))
 
             # Wait for the pool to be full or throw an error
             if not event.wait(timeout=setup_timeout):
@@ -131,12 +123,12 @@ class ConnectionPool:
                 )
         else:
             for i in range(self._nconns):
-                self.run_task(AddConnection(self))
+                self.run_task(tasks.AddConnection(self))
 
         # Schedule a task to shrink the pool if connections over minconn have
         # remained unused. However if the pool cannot't grow don't bother.
         if maxconn > minconn:
-            self.schedule_task(ShrinkPool(self), self.max_idle)
+            self.schedule_task(tasks.ShrinkPool(self), self.max_idle)
 
     def __repr__(self) -> str:
         return (
@@ -206,7 +198,7 @@ class ConnectionPool:
                     logger.info(
                         "growing pool %r to %s", self.name, self._nconns
                     )
-                    self.run_task(AddConnection(self))
+                    self.run_task(tasks.AddConnection(self))
 
         # If we are in the waiting queue, wait to be assigned a connection
         # (outside the critical section, so only the waiting client is locked)
@@ -249,7 +241,7 @@ class ConnectionPool:
             return
 
         # Use a worker to perform eventual maintenance work in a separate thread
-        self.run_task(ReturnConnection(self, conn))
+        self.run_task(tasks.ReturnConnection(self, conn))
 
     @property
     def closed(self) -> bool:
@@ -285,7 +277,7 @@ class ConnectionPool:
 
         # Stop the worker threads
         for i in range(len(self._workers)):
-            self.run_task(StopWorker(self))
+            self.run_task(tasks.StopWorker(self))
 
         # Signal to eventual clients in the queue that business is closed.
         for pos in waiting:
@@ -309,16 +301,16 @@ class ConnectionPool:
                         timeout,
                     )
 
-    def run_task(self, task: "MaintenanceTask") -> None:
+    def run_task(self, task: tasks.MaintenanceTask) -> None:
         """Run a maintenance task in a worker thread."""
         self._tasks.put(task)
 
-    def schedule_task(self, task: "MaintenanceTask", delay: float) -> None:
+    def schedule_task(self, task: tasks.MaintenanceTask, delay: float) -> None:
         """Run a maintenance task in a worker thread in the future."""
         self._sched.enter(delay, task.tick)
 
     @classmethod
-    def worker(cls, q: "Queue[MaintenanceTask]") -> None:
+    def worker(cls, q: "Queue[tasks.MaintenanceTask]") -> None:
         """Runner to execute pending maintenance task.
 
         The function is designed to run as a separate thread.
@@ -343,7 +335,7 @@ class ConnectionPool:
                     "task run %s failed: %s: %s", task, e.__class__.__name__, e
                 )
 
-            if isinstance(task, StopWorker):
+            if isinstance(task, tasks.StopWorker):
                 return
 
     def configure(self, conn: Connection) -> None:
@@ -381,7 +373,7 @@ class ConnectionPool:
         if trigger_event:
             event.set()
 
-    def _add_connection(self, attempt: Optional["ConnectionAttempt"]) -> None:
+    def _add_connection(self, attempt: Optional[ConnectionAttempt]) -> None:
         """Try to connect and add the connection to the pool.
 
         If failed, reschedule a new attempt in the future for a few times, then
@@ -410,7 +402,9 @@ class ConnectionPool:
                 self.reconnect_failed()
             else:
                 attempt.update_delay(now)
-                self.schedule_task(AddConnection(self, attempt), attempt.delay)
+                self.schedule_task(
+                    tasks.AddConnection(self, attempt), attempt.delay
+                )
         else:
             self._add_to_pool(conn)
 
@@ -422,7 +416,7 @@ class ConnectionPool:
         if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
             # Connection no more in working state: create a new one.
             logger.warning("discarding closed connection: %s", conn)
-            self.run_task(AddConnection(self))
+            self.run_task(tasks.AddConnection(self))
         else:
             self._add_to_pool(conn)
 
@@ -569,137 +563,3 @@ class WaitingClient:
             self.error = error
             self._cond.notify_all()
             return True
-
-
-class ConnectionAttempt:
-    """Keep the state of a connection attempt."""
-
-    INITIAL_DELAY = 1.0
-    DELAY_JITTER = 0.1
-    DELAY_BACKOFF = 2.0
-
-    def __init__(self, *, reconnect_timeout: float):
-        self.reconnect_timeout = reconnect_timeout
-        self.delay = 0.0
-        self.give_up_at = 0.0
-
-    def update_delay(self, now: float) -> None:
-        """Calculate how long to wait for a new connection attempt"""
-        if self.delay == 0.0:
-            self.give_up_at = now + self.reconnect_timeout
-            # +/- 10% of the initial delay
-            jitter = self.INITIAL_DELAY * (
-                (2.0 * self.DELAY_JITTER * random.random()) - self.DELAY_JITTER
-            )
-            self.delay = self.INITIAL_DELAY + jitter
-        else:
-            self.delay *= self.DELAY_BACKOFF
-
-        if self.delay + now > self.give_up_at:
-            self.delay = max(0.0, self.give_up_at - now)
-
-    def time_to_give_up(self, now: float) -> bool:
-        """Return True if we are tired of trying to connect. Meh."""
-        return self.give_up_at > 0.0 and now >= self.give_up_at
-
-
-class MaintenanceTask(ABC):
-    """A task to run asynchronously to maintain the pool state."""
-
-    def __init__(self, pool: ConnectionPool):
-        self.pool = ref(pool)
-        logger.debug("task created: %s", self)
-
-    def __repr__(self) -> str:
-        pool = self.pool()
-        name = repr(pool.name) if pool else "<pool is gone>"
-        return f"<{self.__class__.__name__} {name} at 0x{id(self):x}>"
-
-    def run(self) -> None:
-        """Run the task.
-
-        This usually happens in a worker thread. Call the concrete _run()
-        implementation, if the pool is still alive.
-        """
-        pool = self.pool()
-        if not pool or pool.closed:
-            # Pool is no more working. Quietly discard the operation.
-            return
-
-        logger.debug("task running: %s", self)
-        self._run(pool)
-
-    def tick(self) -> None:
-        """Run the scheduled task
-
-        This function is called by the scheduler thread. Use a worker to
-        run the task for real in order to free the scheduler immediately.
-        """
-        pool = self.pool()
-        if not pool or pool.closed:
-            # Pool is no more working. Quietly discard the operation.
-            return
-
-        pool.run_task(self)
-
-    @abstractmethod
-    def _run(self, pool: ConnectionPool) -> None:
-        ...
-
-
-class StopWorker(MaintenanceTask):
-    """Signal the maintenance thread to terminate."""
-
-    def _run(self, pool: ConnectionPool) -> None:
-        pass
-
-
-class AddInitialConnection(MaintenanceTask):
-    """Add a new connection into to the pool.
-
-    If the desired number of connections is reached notify the event.
-    """
-
-    def __init__(self, pool: ConnectionPool, event: threading.Event):
-        super().__init__(pool)
-        self.event = event
-
-    def _run(self, pool: ConnectionPool) -> None:
-        pool._add_initial_connection(self.event)
-
-
-class AddConnection(MaintenanceTask):
-    def __init__(
-        self, pool: ConnectionPool, attempt: Optional[ConnectionAttempt] = None
-    ):
-        super().__init__(pool)
-        self.attempt = attempt
-
-    def _run(self, pool: ConnectionPool) -> None:
-        pool._add_connection(self.attempt)
-
-
-class ReturnConnection(MaintenanceTask):
-    """Clean up and return a connection to the pool."""
-
-    def __init__(self, pool: ConnectionPool, conn: Connection):
-        super().__init__(pool)
-        self.conn = conn
-
-    def _run(self, pool: ConnectionPool) -> None:
-        pool._return_connection(self.conn)
-
-
-class ShrinkPool(MaintenanceTask):
-    """If the pool can shrink, remove one connection.
-
-    Re-schedule periodically and also reset the minimum number of connections
-    in the pool.
-    """
-
-    def _run(self, pool: ConnectionPool) -> None:
-        # Reschedule the task now so that in case of any error we don't lose
-        # the periodic run.
-        pool.schedule_task(self, pool.max_idle)
-
-        pool._shrink_if_possible()
diff --git a/psycopg3/psycopg3/pool/tasks.py b/psycopg3/psycopg3/pool/tasks.py
new file mode 100644 (file)
index 0000000..fe1a11b
--- /dev/null
@@ -0,0 +1,122 @@
+"""
+Maintenance tasks for the connection pools.
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+import logging
+import threading
+from abc import ABC, abstractmethod
+from typing import Optional, TYPE_CHECKING
+from weakref import ref
+
+if TYPE_CHECKING:
+    from .base import ConnectionAttempt
+    from .pool import ConnectionPool
+    from ..connection import Connection
+
+logger = logging.getLogger(__name__)
+
+
+class MaintenanceTask(ABC):
+    """A task to run asynchronously to maintain the pool state."""
+
+    def __init__(self, pool: "ConnectionPool"):
+        self.pool = ref(pool)
+        logger.debug("task created: %s", self)
+
+    def __repr__(self) -> str:
+        pool = self.pool()
+        name = repr(pool.name) if pool else "<pool is gone>"
+        return f"<{self.__class__.__name__} {name} at 0x{id(self):x}>"
+
+    def run(self) -> None:
+        """Run the task.
+
+        This usually happens in a worker thread. Call the concrete _run()
+        implementation, if the pool is still alive.
+        """
+        pool = self.pool()
+        if not pool or pool.closed:
+            # Pool is no more working. Quietly discard the operation.
+            return
+
+        logger.debug("task running: %s", self)
+        self._run(pool)
+
+    def tick(self) -> None:
+        """Run the scheduled task
+
+        This function is called by the scheduler thread. Use a worker to
+        run the task for real in order to free the scheduler immediately.
+        """
+        pool = self.pool()
+        if not pool or pool.closed:
+            # Pool is no more working. Quietly discard the operation.
+            return
+
+        pool.run_task(self)
+
+    @abstractmethod
+    def _run(self, pool: "ConnectionPool") -> None:
+        ...
+
+
+class StopWorker(MaintenanceTask):
+    """Signal the maintenance thread to terminate."""
+
+    def _run(self, pool: "ConnectionPool") -> None:
+        pass
+
+
+class AddInitialConnection(MaintenanceTask):
+    """Add a new connection into to the pool.
+
+    If the desired number of connections is reached notify the event.
+    """
+
+    def __init__(self, pool: "ConnectionPool", event: threading.Event):
+        super().__init__(pool)
+        self.event = event
+
+    def _run(self, pool: "ConnectionPool") -> None:
+        pool._add_initial_connection(self.event)
+
+
+class AddConnection(MaintenanceTask):
+    def __init__(
+        self,
+        pool: "ConnectionPool",
+        attempt: Optional["ConnectionAttempt"] = None,
+    ):
+        super().__init__(pool)
+        self.attempt = attempt
+
+    def _run(self, pool: "ConnectionPool") -> None:
+        pool._add_connection(self.attempt)
+
+
+class ReturnConnection(MaintenanceTask):
+    """Clean up and return a connection to the pool."""
+
+    def __init__(self, pool: "ConnectionPool", conn: "Connection"):
+        super().__init__(pool)
+        self.conn = conn
+
+    def _run(self, pool: "ConnectionPool") -> None:
+        pool._return_connection(self.conn)
+
+
+class ShrinkPool(MaintenanceTask):
+    """If the pool can shrink, remove one connection.
+
+    Re-schedule periodically and also reset the minimum number of connections
+    in the pool.
+    """
+
+    def _run(self, pool: "ConnectionPool") -> None:
+        # Reschedule the task now so that in case of any error we don't lose
+        # the periodic run.
+        pool.schedule_task(self, pool.max_idle)
+
+        pool._shrink_if_possible()
index a80c8fb1d95249af7c8cd79564bcdb64cfa39238..93c4cb37cdd64930dbc9c373774be6be71086882 100644 (file)
@@ -486,7 +486,9 @@ def test_grow(dsn, monkeypatch):
 @pytest.mark.slow
 def test_shrink(dsn, monkeypatch):
 
-    orig_run = pool.pool.ShrinkPool._run
+    from psycopg3.pool.tasks import ShrinkPool
+
+    orig_run = ShrinkPool._run
     results = []
 
     def run_hacked(self, pool):
@@ -495,7 +497,7 @@ def test_shrink(dsn, monkeypatch):
         n1 = pool._nconns
         results.append((n0, n1))
 
-    monkeypatch.setattr(pool.pool.ShrinkPool, "_run", run_hacked)
+    monkeypatch.setattr(ShrinkPool, "_run", run_hacked)
 
     p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, max_idle=0.2)
     assert p.max_idle == 0.2