# max_idle interval they weren't all used.
self._nconns_min = minconn
- self._wqueue: "Queue[MaintenanceTask]" = Queue()
+ self._tasks: "Queue[MaintenanceTask]" = Queue()
self._workers: List[threading.Thread] = []
for i in range(num_workers):
t = threading.Thread(
- target=self.worker, args=(self._wqueue,), daemon=True
+ target=self.worker, args=(self._tasks,), daemon=True
)
self._workers.append(t)
# Use a worker to perform eventual maintenance work in a separate thread
self.run_task(ReturnConnection(self, conn))
- def _add_to_pool(self, conn: Connection) -> None:
- """
- Add a connection to the pool.
-
- The connection can be a fresh one or one already used in the pool.
- """
- # Remove the pool reference from the connection before returning it
- # to the state, to avoid to create a reference loop.
- # Also disable the warning for open connection in conn.__del__
- conn._pool = None
-
- self._reset_transaction_status(conn)
- if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
- # Connection no more in working state: create a new one.
- logger.warning("discarding closed connection: %s", conn)
- self.run_task(AddConnection(self))
- return
-
- pos: Optional[WaitingClient] = None
- to_close: Optional[Connection] = None
-
- # Critical section: if there is a client waiting give it the connection
- # otherwise put it back into the pool.
- with self._lock:
- while self._waiting:
- # If there is a client waiting (which is still waiting and
- # hasn't timed out), give it the connection and notify it.
- pos = self._waiting.popleft()
- if pos.set(conn):
- break
-
- else:
- # No client waiting for a connection: put it back into the pool
- self._pool.append(conn)
-
- if to_close:
- to_close.close()
-
- def _reset_transaction_status(self, conn: Connection) -> None:
- """
- Bring a connection to IDLE state or close it.
- """
- status = conn.pgconn.transaction_status
- if status == TransactionStatus.IDLE:
- return
-
- if status in (TransactionStatus.INTRANS, TransactionStatus.INERROR):
- # Connection returned with an active transaction
- logger.warning("rolling back returned connection: %s", conn)
- try:
- conn.rollback()
- except Exception as e:
- logger.warning(
- "rollback failed: %s: %s. Discarding connection %s",
- e.__class__.__name__,
- e,
- conn,
- )
- conn.close()
-
- elif status == TransactionStatus.ACTIVE:
- # Connection returned during an operation. Bad... just close it.
- logger.warning("closing returned connection: %s", conn)
- conn.close()
-
@property
def closed(self) -> bool:
"""`!True` if the pool is closed."""
def run_task(self, task: "MaintenanceTask") -> None:
"""Run a maintenance task in a worker thread."""
- self._wqueue.put(task)
+ self._tasks.put(task)
- def schedule_task(
- self, task: "MaintenanceTask", delay: float, absolute: bool = False
- ) -> None:
+ def schedule_task(self, task: "MaintenanceTask", delay: float) -> None:
"""Run a maintenance task in a worker thread in the future."""
- if absolute:
- self._sched.enterabs(delay, task.tick)
- else:
- self._sched.enter(delay, task.tick)
+ self._sched.enter(delay, task.tick)
@classmethod
def worker(cls, q: "Queue[MaintenanceTask]") -> None:
if isinstance(task, StopWorker):
return
+ def configure(self, conn: Connection) -> None:
+ """Configure a connection after creation."""
+ self._configure(conn)
+
+ def reconnect_failed(self) -> None:
+ """
+ Called when reconnection failed for longer than `reconnect_timeout`.
+ """
+ self._reconnect_failed(self)
+
def _connect(self) -> Connection:
"""Return a new connection configured for the pool."""
conn = Connection.connect(self.conninfo, **self.kwargs)
conn._pool = self
return conn
- def configure(self, conn: Connection) -> None:
- """Configure a connection after creation."""
- self._configure(conn)
+ def _add_initial_connection(self, event: threading.Event) -> None:
+ """Create a new connection at the beginning of the pool life.
- def reconnect_failed(self) -> None:
+ Trigger *event* if all the connections necessary have been added.
"""
- Called when reconnection failed for longer than `reconnect_timeout`.
+ conn = self._connect()
+ conn._pool = None # avoid a reference loop
+
+ with self._lock:
+ assert (
+ not self._waiting
+ ), "clients waiting in a pool being initialised"
+ self._pool.append(conn)
+ trigger_event = len(self._pool) >= self._nconns
+
+ if trigger_event:
+ event.set()
+
+ def _add_connection(self, attempt: Optional["ConnectionAttempt"]) -> None:
+ """Try to connect and add the connection to the pool.
+
+ If failed, reschedule a new attempt in the future for a few times, then
+ give up, decrease the pool connections number and call
+ `self.reconnect_failed()`.
+
"""
- self._reconnect_failed(self)
+ now = time.monotonic()
+ if not attempt:
+ attempt = ConnectionAttempt(
+ reconnect_timeout=self.reconnect_timeout
+ )
+
+ try:
+ conn = self._connect()
+ except Exception as e:
+ logger.warning(f"error connecting in {self.name!r}: {e}")
+ if attempt.time_to_give_up(now):
+ logger.warning(
+ "reconnection attempt in pool %r failed after %s sec",
+ self.name,
+ self.reconnect_timeout,
+ )
+ with self._lock:
+ self._nconns -= 1
+ self.reconnect_failed()
+ else:
+ attempt.update_delay(now)
+ self.schedule_task(AddConnection(self, attempt), attempt.delay)
+ else:
+ self._add_to_pool(conn)
+
+ def _return_connection(self, conn: Connection) -> None:
+ """
+ Return a connection to the pool after usage.
+ """
+ self._reset_connection(conn)
+ if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+ # Connection no more in working state: create a new one.
+ logger.warning("discarding closed connection: %s", conn)
+ self.run_task(AddConnection(self))
+ else:
+ self._add_to_pool(conn)
+
+ def _add_to_pool(self, conn: Connection) -> None:
+ """
+ Add a connection to the pool.
+
+ The connection can be a fresh one or one already used in the pool.
+
+ If a client is already waiting for a connection pass it on, otherwise
+ put it back into the pool
+ """
+ # Remove the pool reference from the connection before returning it
+ # to the state, to avoid to create a reference loop.
+ # Also disable the warning for open connection in conn.__del__
+ conn._pool = None
+
+ pos: Optional[WaitingClient] = None
+
+ # Critical section: if there is a client waiting give it the connection
+ # otherwise put it back into the pool.
+ with self._lock:
+ while self._waiting:
+ # If there is a client waiting (which is still waiting and
+ # hasn't timed out), give it the connection and notify it.
+ pos = self._waiting.popleft()
+ if pos.set(conn):
+ break
+
+ else:
+ # No client waiting for a connection: put it back into the pool
+ self._pool.append(conn)
+
+ def _reset_connection(self, conn: Connection) -> None:
+ """
+ Bring a connection to IDLE state or close it.
+ """
+ status = conn.pgconn.transaction_status
+ if status == TransactionStatus.IDLE:
+ return
+
+ if status in (TransactionStatus.INTRANS, TransactionStatus.INERROR):
+ # Connection returned with an active transaction
+ logger.warning("rolling back returned connection: %s", conn)
+ try:
+ conn.rollback()
+ except Exception as e:
+ logger.warning(
+ "rollback failed: %s: %s. Discarding connection %s",
+ e.__class__.__name__,
+ e,
+ conn,
+ )
+ conn.close()
+
+ elif status == TransactionStatus.ACTIVE:
+ # Connection returned during an operation. Bad... just close it.
+ logger.warning("closing returned connection: %s", conn)
+ conn.close()
+
+ def _shrink_if_possible(self) -> None:
+ to_close: Optional[Connection] = None
+
+ with self._lock:
+ # Reset the min number of connections used
+ nconns_min = self._nconns_min
+ self._nconns_min = len(self._pool)
+
+ # If the pool can shrink and connections were unused, drop one
+ if self._nconns > self.minconn and nconns_min > 0:
+ to_close = self._pool.popleft()
+ self._nconns -= 1
+
+ if to_close:
+ logger.info(
+ "shrinking pool %r to %s because %s unused connections"
+ " in the last %s sec",
+ self.name,
+ self._nconns,
+ nconns_min,
+ self.max_idle,
+ )
+ to_close.close()
class WaitingClient:
return True
+class ConnectionAttempt:
+ """Keep the state of a connection attempt."""
+
+ INITIAL_DELAY = 1.0
+ DELAY_JITTER = 0.1
+ DELAY_BACKOFF = 2.0
+
+ def __init__(self, *, reconnect_timeout: float):
+ self.reconnect_timeout = reconnect_timeout
+ self.delay = 0.0
+ self.give_up_at = 0.0
+
+ def update_delay(self, now: float) -> None:
+ """Calculate how long to wait for a new connection attempt"""
+ if self.delay == 0.0:
+ self.give_up_at = now + self.reconnect_timeout
+ # +/- 10% of the initial delay
+ jitter = self.INITIAL_DELAY * (
+ (2.0 * self.DELAY_JITTER * random.random()) - self.DELAY_JITTER
+ )
+ self.delay = self.INITIAL_DELAY + jitter
+ else:
+ self.delay *= self.DELAY_BACKOFF
+
+ if self.delay + now > self.give_up_at:
+ self.delay = max(0.0, self.give_up_at - now)
+
+ def time_to_give_up(self, now: float) -> bool:
+ """Return True if we are tired of trying to connect. Meh."""
+ return self.give_up_at > 0.0 and now >= self.give_up_at
+
+
class MaintenanceTask(ABC):
"""A task to run asynchronously to maintain the pool state."""
self.event = event
def _run(self, pool: ConnectionPool) -> None:
- conn = pool._connect()
- pool._add_to_pool(conn)
- if len(pool._pool) >= pool._nconns:
- self.event.set()
+ pool._add_initial_connection(self.event)
class AddConnection(MaintenanceTask):
- INITIAL_DELAY = 1.0
- DELAY_JITTER = 0.1
- DELAY_BACKOFF = 2.0
-
- def __init__(self, pool: ConnectionPool):
+ def __init__(
+ self, pool: ConnectionPool, attempt: Optional[ConnectionAttempt] = None
+ ):
super().__init__(pool)
- self.delay = 0.0
- self.give_up_at = 0.0
+ self.attempt = attempt
def _run(self, pool: ConnectionPool) -> None:
- try:
- conn = pool._connect()
- except Exception as e:
- logger.warning(f"error reconnecting in {pool.name!r}: {e}")
- self._handle_error(pool)
- else:
- pool._add_to_pool(conn)
-
- def _handle_error(self, pool: ConnectionPool) -> None:
- """Called after a connection failure.
-
- Calculate the new time for a new reconnection attempt and schedule a
- retry in the future. If too many attempts were performed, give up, by
- decreasing the pool connection number and calling
- `pool.reconnect_failed()`.
- """
- now = time.monotonic()
- if self.give_up_at and now >= self.give_up_at:
- logger.warning(
- "reconnection attempt in pool %r failed after %s sec",
- pool.name,
- pool.reconnect_timeout,
- )
- with pool._lock:
- pool._nconns -= 1
- pool.reconnect_failed()
- return
-
- # Calculate how long to wait for a new connection attempt
- if self.delay == 0.0:
- self.give_up_at = now + pool.reconnect_timeout
- # +/- 10% of the initial delay
- jitter = self.INITIAL_DELAY * (
- (2.0 * self.DELAY_JITTER * random.random()) - self.DELAY_JITTER
- )
- self.delay = self.INITIAL_DELAY + jitter
- else:
- self.delay *= self.DELAY_BACKOFF
-
- # Schedule a run of self.tick() some time in the future
- if now + self.delay < self.give_up_at:
- pool.schedule_task(self, self.delay)
- else:
- pool.schedule_task(self, self.give_up_at, absolute=True)
+ pool._add_connection(self.attempt)
class ReturnConnection(MaintenanceTask):
self.conn = conn
def _run(self, pool: ConnectionPool) -> None:
- pool._add_to_pool(self.conn)
+ pool._return_connection(self.conn)
class ShrinkPool(MaintenanceTask):
# the periodic run.
pool.schedule_task(self, pool.max_idle)
- to_close: Optional[Connection] = None
-
- with pool._lock:
- # Reset the min number of connections used
- nconns_min = pool._nconns_min
- pool._nconns_min = len(pool._pool)
-
- # If the pool can shrink and connections were unused, drop one
- if pool._nconns > pool.minconn and nconns_min > 0:
- to_close = pool._pool.popleft()
- pool._nconns -= 1
-
- if to_close:
- logger.info(
- "shrinking pool %r to %s because %s unused connections"
- " in the last %s sec",
- pool.name,
- pool._nconns,
- nconns_min,
- pool.max_idle,
- )
- to_close.close()
+ pool._shrink_if_possible()