]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Make any pool task schedulable
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 22 Feb 2021 01:33:36 +0000 (02:33 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 12 Mar 2021 04:07:25 +0000 (05:07 +0100)
Added interface to the pool to schedule a task in the future.

psycopg3/psycopg3/pool.py

index 8eca7a203e54eacbd94ef4a0c07ad935ff3a2ae3..20b3159d58679f6d31967489356c58fa19cb9626 100644 (file)
@@ -85,7 +85,7 @@ class ConnectionPool:
         self._pool: Deque[Tuple[Connection, float]] = deque()
         self._waiting: Deque["WaitingClient"] = deque()
         self._lock = threading.RLock()
-        self.sched = Scheduler()
+        self._sched = Scheduler()
 
         self._wqueue: "Queue[MaintenanceTask]" = Queue()
         self._workers: List[threading.Thread] = []
@@ -94,7 +94,7 @@ class ConnectionPool:
             t.daemon = True
             self._workers.append(t)
 
-        self._sched_runner = threading.Thread(target=self.sched.run)
+        self._sched_runner = threading.Thread(target=self._sched.run)
         self._sched_runner.daemon = True
 
         # _close should be the last property to be set in the state
@@ -111,7 +111,7 @@ class ConnectionPool:
         if setup_timeout > 0:
             event = threading.Event()
             for i in range(self._nconns):
-                self.add_task(AddInitialConnection(self, event))
+                self.run_task(AddInitialConnection(self, event))
 
             # Wait for the pool to be full or throw an error
             if not event.wait(timeout=setup_timeout):
@@ -121,7 +121,7 @@ class ConnectionPool:
                 )
         else:
             for i in range(self._nconns):
-                self.add_task(AddConnection(self))
+                self.run_task(AddConnection(self))
 
     def __repr__(self) -> str:
         return (
@@ -187,7 +187,7 @@ class ConnectionPool:
                 if self._nconns < self.maxconn:
                     logger.debug("growing pool %r", self.name)
                     self._nconns += 1
-                    self.add_task(AddConnection(self))
+                    self.run_task(AddConnection(self))
 
         # If we are in the waiting queue, wait to be assigned a connection
         # (outside the critical section, so only the waiting client is locked)
@@ -230,7 +230,7 @@ class ConnectionPool:
             return
 
         # Use a worker to perform eventual maintenance work in a separate thread
-        self.add_task(ReturnConnection(self, conn))
+        self.run_task(ReturnConnection(self, conn))
 
     def _add_to_pool(self, conn: Connection) -> None:
         """
@@ -247,7 +247,7 @@ class ConnectionPool:
         if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
             # Connection no more in working state: create a new one.
             logger.warning("discarding closed connection: %s", conn)
-            self.add_task(AddConnection(self))
+            self.run_task(AddConnection(self))
             return
 
         pos: Optional[WaitingClient] = None
@@ -336,11 +336,11 @@ class ConnectionPool:
         # putconn will just close the returned connection.
 
         # Stop the scheduler
-        self.sched.enter(0, None)
+        self._sched.enter(0, None)
 
         # Stop the worker threads
         for i in range(len(self._workers)):
-            self.add_task(StopWorker(self))
+            self.run_task(StopWorker(self))
 
         # Signal to eventual clients in the queue that business is closed.
         while self._waiting:
@@ -366,10 +366,19 @@ class ConnectionPool:
                         timeout,
                     )
 
-    def add_task(self, task: "MaintenanceTask") -> None:
-        """Add a task to the queue of tasts to perform."""
+    def run_task(self, task: "MaintenanceTask") -> None:
+        """Run a maintenance task in a worker thread."""
         self._wqueue.put(task)
 
+    def schedule_task(
+        self, task: "MaintenanceTask", delay: float, absolute: bool = False
+    ) -> None:
+        """Run a maintenance task in a worker thread in the future."""
+        if absolute:
+            self._sched.enterabs(delay, task.tick)
+        else:
+            self._sched.enter(delay, task.tick)
+
     @classmethod
     def worker(cls, q: "Queue[MaintenanceTask]") -> None:
         """Runner to execute pending maintenance task.
@@ -481,7 +490,7 @@ class WaitingClient:
 
 
 class MaintenanceTask(ABC):
-    """A task run asynchronously to maintain the pool state."""
+    """A task to run asynchronously to maintain the pool state."""
 
     def __init__(self, pool: ConnectionPool):
         self.pool = ref(pool)
@@ -493,14 +502,32 @@ class MaintenanceTask(ABC):
         return f"<{self.__class__.__name__} {name} at 0x{id(self):x}>"
 
     def run(self) -> None:
+        """Run the task.
+
+        This usually happens in a worker thread. Call the concrete _run()
+        implementation, if the pool is still alive.
+        """
         pool = self.pool()
-        if not pool:
-            # Pool has been deleted. Quietly discard operation.
+        if not pool or pool.closed:
+            # Pool is no more working. Quietly discard the operation.
             return
 
         logger.debug("task running: %s", self)
         self._run(pool)
 
+    def tick(self) -> None:
+        """Run the scheduled task
+
+        This function is called by the scheduler thread. Use a worker to
+        run the task for real in order to free the scheduler immediately.
+        """
+        pool = self.pool()
+        if not pool or pool.closed:
+            # Pool is no more working. Quietly discard the operation.
+            return
+
+        pool.run_task(self)
+
     @abstractmethod
     def _run(self, pool: ConnectionPool) -> None:
         ...
@@ -580,18 +607,11 @@ class AddConnection(MaintenanceTask):
         else:
             self.delay *= self.DELAY_BACKOFF
 
-        # Schedule a run of self.retry() some time in the future
+        # Schedule a run of self.tick() some time in the future
         if now + self.delay < self.give_up_at:
-            pool.sched.enter(self.delay, self.retry)
+            pool.schedule_task(self, self.delay)
         else:
-            pool.sched.enterabs(self.give_up_at, self.retry)
-
-    def retry(self) -> None:
-        pool = self.pool()
-        if not pool:
-            return
-
-        pool.add_task(self)
+            pool.schedule_task(self, self.give_up_at, absolute=True)
 
 
 class ReturnConnection(MaintenanceTask):