From: Daniele Varrazzo Date: Mon, 22 Feb 2021 01:33:36 +0000 (+0100) Subject: Make any pool task schedulable X-Git-Tag: 3.0.dev0~87^2~51 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9009f52091e38767b69a02a18778b93d637eb4a2;p=thirdparty%2Fpsycopg.git Make any pool task schedulable Added interface to the pool to schedule a task in the future. --- diff --git a/psycopg3/psycopg3/pool.py b/psycopg3/psycopg3/pool.py index 8eca7a203..20b3159d5 100644 --- a/psycopg3/psycopg3/pool.py +++ b/psycopg3/psycopg3/pool.py @@ -85,7 +85,7 @@ class ConnectionPool: self._pool: Deque[Tuple[Connection, float]] = deque() self._waiting: Deque["WaitingClient"] = deque() self._lock = threading.RLock() - self.sched = Scheduler() + self._sched = Scheduler() self._wqueue: "Queue[MaintenanceTask]" = Queue() self._workers: List[threading.Thread] = [] @@ -94,7 +94,7 @@ class ConnectionPool: t.daemon = True self._workers.append(t) - self._sched_runner = threading.Thread(target=self.sched.run) + self._sched_runner = threading.Thread(target=self._sched.run) self._sched_runner.daemon = True # _close should be the last property to be set in the state @@ -111,7 +111,7 @@ class ConnectionPool: if setup_timeout > 0: event = threading.Event() for i in range(self._nconns): - self.add_task(AddInitialConnection(self, event)) + self.run_task(AddInitialConnection(self, event)) # Wait for the pool to be full or throw an error if not event.wait(timeout=setup_timeout): @@ -121,7 +121,7 @@ class ConnectionPool: ) else: for i in range(self._nconns): - self.add_task(AddConnection(self)) + self.run_task(AddConnection(self)) def __repr__(self) -> str: return ( @@ -187,7 +187,7 @@ class ConnectionPool: if self._nconns < self.maxconn: logger.debug("growing pool %r", self.name) self._nconns += 1 - self.add_task(AddConnection(self)) + self.run_task(AddConnection(self)) # If we are in the waiting queue, wait to be assigned a connection # (outside the critical section, so only the waiting client is locked) @@ -230,7 +230,7 @@ class ConnectionPool: return # Use a worker to perform eventual maintenance work in a separate thread - self.add_task(ReturnConnection(self, conn)) + self.run_task(ReturnConnection(self, conn)) def _add_to_pool(self, conn: Connection) -> None: """ @@ -247,7 +247,7 @@ class ConnectionPool: if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: # Connection no more in working state: create a new one. logger.warning("discarding closed connection: %s", conn) - self.add_task(AddConnection(self)) + self.run_task(AddConnection(self)) return pos: Optional[WaitingClient] = None @@ -336,11 +336,11 @@ class ConnectionPool: # putconn will just close the returned connection. # Stop the scheduler - self.sched.enter(0, None) + self._sched.enter(0, None) # Stop the worker threads for i in range(len(self._workers)): - self.add_task(StopWorker(self)) + self.run_task(StopWorker(self)) # Signal to eventual clients in the queue that business is closed. while self._waiting: @@ -366,10 +366,19 @@ class ConnectionPool: timeout, ) - def add_task(self, task: "MaintenanceTask") -> None: - """Add a task to the queue of tasts to perform.""" + def run_task(self, task: "MaintenanceTask") -> None: + """Run a maintenance task in a worker thread.""" self._wqueue.put(task) + def schedule_task( + self, task: "MaintenanceTask", delay: float, absolute: bool = False + ) -> None: + """Run a maintenance task in a worker thread in the future.""" + if absolute: + self._sched.enterabs(delay, task.tick) + else: + self._sched.enter(delay, task.tick) + @classmethod def worker(cls, q: "Queue[MaintenanceTask]") -> None: """Runner to execute pending maintenance task. @@ -481,7 +490,7 @@ class WaitingClient: class MaintenanceTask(ABC): - """A task run asynchronously to maintain the pool state.""" + """A task to run asynchronously to maintain the pool state.""" def __init__(self, pool: ConnectionPool): self.pool = ref(pool) @@ -493,14 +502,32 @@ class MaintenanceTask(ABC): return f"<{self.__class__.__name__} {name} at 0x{id(self):x}>" def run(self) -> None: + """Run the task. + + This usually happens in a worker thread. Call the concrete _run() + implementation, if the pool is still alive. + """ pool = self.pool() - if not pool: - # Pool has been deleted. Quietly discard operation. + if not pool or pool.closed: + # Pool is no more working. Quietly discard the operation. return logger.debug("task running: %s", self) self._run(pool) + def tick(self) -> None: + """Run the scheduled task + + This function is called by the scheduler thread. Use a worker to + run the task for real in order to free the scheduler immediately. + """ + pool = self.pool() + if not pool or pool.closed: + # Pool is no more working. Quietly discard the operation. + return + + pool.run_task(self) + @abstractmethod def _run(self, pool: ConnectionPool) -> None: ... @@ -580,18 +607,11 @@ class AddConnection(MaintenanceTask): else: self.delay *= self.DELAY_BACKOFF - # Schedule a run of self.retry() some time in the future + # Schedule a run of self.tick() some time in the future if now + self.delay < self.give_up_at: - pool.sched.enter(self.delay, self.retry) + pool.schedule_task(self, self.delay) else: - pool.sched.enterabs(self.give_up_at, self.retry) - - def retry(self) -> None: - pool = self.pool() - if not pool: - return - - pool.add_task(self) + pool.schedule_task(self, self.give_up_at, absolute=True) class ReturnConnection(MaintenanceTask):