self._pool: Deque[Tuple[Connection, float]] = deque()
self._waiting: Deque["WaitingClient"] = deque()
self._lock = threading.RLock()
- self.sched = Scheduler()
+ self._sched = Scheduler()
self._wqueue: "Queue[MaintenanceTask]" = Queue()
self._workers: List[threading.Thread] = []
t.daemon = True
self._workers.append(t)
- self._sched_runner = threading.Thread(target=self.sched.run)
+ self._sched_runner = threading.Thread(target=self._sched.run)
self._sched_runner.daemon = True
# _close should be the last property to be set in the state
if setup_timeout > 0:
event = threading.Event()
for i in range(self._nconns):
- self.add_task(AddInitialConnection(self, event))
+ self.run_task(AddInitialConnection(self, event))
# Wait for the pool to be full or throw an error
if not event.wait(timeout=setup_timeout):
)
else:
for i in range(self._nconns):
- self.add_task(AddConnection(self))
+ self.run_task(AddConnection(self))
def __repr__(self) -> str:
return (
if self._nconns < self.maxconn:
logger.debug("growing pool %r", self.name)
self._nconns += 1
- self.add_task(AddConnection(self))
+ self.run_task(AddConnection(self))
# If we are in the waiting queue, wait to be assigned a connection
# (outside the critical section, so only the waiting client is locked)
return
# Use a worker to perform eventual maintenance work in a separate thread
- self.add_task(ReturnConnection(self, conn))
+ self.run_task(ReturnConnection(self, conn))
def _add_to_pool(self, conn: Connection) -> None:
"""
if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
# Connection no more in working state: create a new one.
logger.warning("discarding closed connection: %s", conn)
- self.add_task(AddConnection(self))
+ self.run_task(AddConnection(self))
return
pos: Optional[WaitingClient] = None
# putconn will just close the returned connection.
# Stop the scheduler
- self.sched.enter(0, None)
+ self._sched.enter(0, None)
# Stop the worker threads
for i in range(len(self._workers)):
- self.add_task(StopWorker(self))
+ self.run_task(StopWorker(self))
# Signal to eventual clients in the queue that business is closed.
while self._waiting:
timeout,
)
- def add_task(self, task: "MaintenanceTask") -> None:
- """Add a task to the queue of tasts to perform."""
+ def run_task(self, task: "MaintenanceTask") -> None:
+ """Run a maintenance task in a worker thread."""
self._wqueue.put(task)
+ def schedule_task(
+ self, task: "MaintenanceTask", delay: float, absolute: bool = False
+ ) -> None:
+ """Run a maintenance task in a worker thread in the future."""
+ if absolute:
+ self._sched.enterabs(delay, task.tick)
+ else:
+ self._sched.enter(delay, task.tick)
+
@classmethod
def worker(cls, q: "Queue[MaintenanceTask]") -> None:
"""Runner to execute pending maintenance task.
class MaintenanceTask(ABC):
- """A task run asynchronously to maintain the pool state."""
+ """A task to run asynchronously to maintain the pool state."""
def __init__(self, pool: ConnectionPool):
self.pool = ref(pool)
return f"<{self.__class__.__name__} {name} at 0x{id(self):x}>"
def run(self) -> None:
+ """Run the task.
+
+ This usually happens in a worker thread. Call the concrete _run()
+ implementation, if the pool is still alive.
+ """
pool = self.pool()
- if not pool:
- # Pool has been deleted. Quietly discard operation.
+ if not pool or pool.closed:
+ # Pool is no more working. Quietly discard the operation.
return
logger.debug("task running: %s", self)
self._run(pool)
+ def tick(self) -> None:
+ """Run the scheduled task
+
+ This function is called by the scheduler thread. Use a worker to
+ run the task for real in order to free the scheduler immediately.
+ """
+ pool = self.pool()
+ if not pool or pool.closed:
+ # Pool is no more working. Quietly discard the operation.
+ return
+
+ pool.run_task(self)
+
@abstractmethod
def _run(self, pool: ConnectionPool) -> None:
...
else:
self.delay *= self.DELAY_BACKOFF
- # Schedule a run of self.retry() some time in the future
+ # Schedule a run of self.tick() some time in the future
if now + self.delay < self.give_up_at:
- pool.sched.enter(self.delay, self.retry)
+ pool.schedule_task(self, self.delay)
else:
- pool.sched.enterabs(self.give_up_at, self.retry)
-
- def retry(self) -> None:
- pool = self.pool()
- if not pool:
- return
-
- pool.add_task(self)
+ pool.schedule_task(self, self.give_up_at, absolute=True)
class ReturnConnection(MaintenanceTask):