logger.info("waiting for pool %r initialization", self.name)
if not self._pool_full_event.wait(timeout):
self.close() # stop all the threads
- raise PoolTimeout(f"pool initialization incomplete after {timeout} sec")
+ raise PoolTimeout(
+ f"pool initialization incomplete after {timeout} sec"
+ ) from None
with self._lock:
assert self._pool_full_event
logger.info("connection requested from %r", self.name)
self._stats[self._REQUESTS_NUM] += 1
+ self._check_open_getconn()
+
# Critical section: decide here if there's a connection ready
# or if the client needs to wait.
with self._lock:
- self._check_open_getconn()
conn = self._get_ready_connection(timeout)
if not conn:
# No connection available: put the client in the waiting queue
self._check_pool_putconn(conn)
logger.info("returning connection to %r", self.name)
-
if self._maybe_close_connection(conn):
return
def _start_workers(self) -> None:
self._sched_runner = threading.Thread(
- target=self._sched.run,
- name=f"{self.name}-scheduler",
- daemon=True,
+ target=self._sched.run, name=f"{self.name}-scheduler", daemon=True
)
assert not self._workers
for i in range(self.num_workers):
if isinstance(task, StopWorker):
logger.debug(
- "terminating working thread %s",
- threading.current_thread().name,
+ "terminating working thread %s", threading.current_thread().name
)
return
kwargs["connect_timeout"] = max(round(timeout), 1)
t0 = monotonic()
try:
- conn: CT = self.connection_class.connect( # type: ignore
- self.conninfo, **kwargs
- )
+ conn: CT = cast(CT, self.connection_class.connect(self.conninfo, **kwargs))
except Exception:
self._stats[self._CONNECTIONS_ERRORS] += 1
raise
self._open()
async def wait(self, timeout: float = 30.0) -> None:
+ """
+ Wait for the pool to be full (with `min_size` connections) after creation.
+
+ Close the pool, and raise `PoolTimeout`, if not ready within *timeout*
+ sec.
+
+ Calling this method is not mandatory: you can try and use the pool
+ immediately after its creation. The first client will be served as soon
+ as a connection is ready. You can use this method if you prefer your
+ program to terminate in case the environment is not configured
+ properly, rather than trying to stay up the hardest it can.
+ """
self._check_open_getconn()
async with self._lock:
@asynccontextmanager
async def connection(self, timeout: Optional[float] = None) -> AsyncIterator[ACT]:
+ """Context manager to obtain a connection from the pool.
+
+ Return the connection immediately if available, otherwise wait up to
+ *timeout* or `self.timeout` seconds and throw `PoolTimeout` if a
+ connection is not available in time.
+
+ Upon context exit, return the connection to the pool. Apply the normal
+ :ref:`connection context behaviour <with-connection>` (commit/rollback
+ the transaction in case of success/error). If the connection is no more
+ in working state, replace it with a new one.
+ """
conn = await self.getconn(timeout=timeout)
try:
t0 = monotonic()
self._stats[self._USAGE_MS] += int(1000.0 * (t1 - t0))
async def getconn(self, timeout: Optional[float] = None) -> ACT:
+ """Obtain a connection from the pool.
+
+ You should preferably use `connection()`. Use this function only if
+ it is not possible to use the connection as context manager.
+
+ After using this function you *must* call a corresponding `putconn()`:
+ failing to do so will deplete the pool. A depleted pool is a sad pool:
+ you don't want a depleted pool.
+ """
logger.info("connection requested from %r", self.name)
self._stats[self._REQUESTS_NUM] += 1
return conn
async def _get_ready_connection(self, timeout: Optional[float]) -> Optional[ACT]:
+ """Return a connection, if the client deserves one."""
conn: Optional[ACT] = None
if self._pool:
# Take a connection ready out of the pool
def _maybe_grow_pool(self) -> None:
# Allow only one task at time to grow the pool (or returning
# connections might be starved).
- if self._nconns < self._max_size and not self._growing:
- self._nconns += 1
- logger.info("growing pool %r to %s", self.name, self._nconns)
- self._growing = True
- self.run_task(AddConnection(self, growing=True))
+ if self._nconns >= self._max_size or self._growing:
+ return
+ self._nconns += 1
+ logger.info("growing pool %r to %s", self.name, self._nconns)
+ self._growing = True
+ self.run_task(AddConnection(self, growing=True))
async def putconn(self, conn: ACT) -> None:
+ """Return a connection to the loving hands of its pool.
+
+ Use this function only paired with a `getconn()`. You don't need to use
+ it if you use the much more comfortable `connection()` context manager.
+ """
+ # Quick check to discard the wrong connection
self._check_pool_putconn(conn)
logger.info("returning connection to %r", self.name)
await self._return_connection(conn)
async def _maybe_close_connection(self, conn: ACT) -> bool:
+ """Close a returned connection if necessary.
+
+ Return `!True if the connection was closed.
+ """
# If the pool is closed just close the connection instead of returning
# it to the pool. For extra refcare remove the pool reference from it.
if not self._closed:
return True
async def open(self, wait: bool = False, timeout: float = 30.0) -> None:
+ """Open the pool by starting connecting and and accepting clients.
+
+ If *wait* is `!False`, return immediately and let the background worker
+ fill the pool if `min_size` > 0. Otherwise wait up to *timeout* seconds
+ for the requested number of connections to be ready (see `wait()` for
+ details).
+
+ It is safe to call `!open()` again on a pool already open (because the
+ method was already called, or because the pool context was entered, or
+ because the pool was initialized with *open* = `!True`) but you cannot
+ currently re-open a closed pool.
+ """
# Make sure the lock is created after there is an event loop
try:
self._lock
self.run_task(Schedule(self, ShrinkPool(self), self.max_idle))
async def close(self, timeout: float = 5.0) -> None:
+ """Close the pool and make it unavailable to new clients.
+
+ All the waiting and future clients will fail to acquire a connection
+ with a `PoolClosed` exception. Currently used connections will not be
+ closed until returned to the pool.
+
+ Wait *timeout* seconds for threads to terminate their job, if positive.
+ If the timeout expires the pool is closed anyway, although it may raise
+ some warnings on exit.
+ """
if self._closed:
return
await self.close()
async def resize(self, min_size: int, max_size: Optional[int] = None) -> None:
+ """Change the size of the pool during runtime."""
min_size, max_size = self._check_size(min_size, max_size)
ngrow = max(0, min_size - self._min_size)
self.run_task(AddConnection(self))
async def check(self) -> None:
+ """Verify the state of the connections currently in the pool.
+
+ Test each connection: if it works return it to the pool, otherwise
+ dispose of it and create a new one.
+ """
async with self._lock:
conns = list(self._pool)
self._pool.clear()
)
async def _connect(self, timeout: Optional[float] = None) -> ACT:
+ """Return a new connection configured for the pool."""
self._stats[self._CONNECTIONS_NUM] += 1
kwargs = self.kwargs
if timeout:
kwargs["connect_timeout"] = max(round(timeout), 1)
t0 = monotonic()
try:
- conn: ACT = await self.connection_class.connect(self.conninfo, **kwargs)
+ conn: ACT = cast(
+ ACT, await self.connection_class.connect(self.conninfo, **kwargs)
+ )
except Exception:
self._stats[self._CONNECTIONS_ERRORS] += 1
raise