From: Daniele Varrazzo Date: Sun, 1 Oct 2023 01:59:37 +0000 (+0200) Subject: refactor(pool): light refactor to align async and sync pool X-Git-Tag: pool-3.2.0~12^2~30 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=cbd6d89c006a3efe75f74afe43e1ad0372d4c783;p=thirdparty%2Fpsycopg.git refactor(pool): light refactor to align async and sync pool --- diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py index fca93724b..6e5876eb7 100644 --- a/psycopg_pool/psycopg_pool/pool.py +++ b/psycopg_pool/psycopg_pool/pool.py @@ -169,7 +169,9 @@ class ConnectionPool(Generic[CT], BasePool): logger.info("waiting for pool %r initialization", self.name) if not self._pool_full_event.wait(timeout): self.close() # stop all the threads - raise PoolTimeout(f"pool initialization incomplete after {timeout} sec") + raise PoolTimeout( + f"pool initialization incomplete after {timeout} sec" + ) from None with self._lock: assert self._pool_full_event @@ -213,10 +215,11 @@ class ConnectionPool(Generic[CT], BasePool): logger.info("connection requested from %r", self.name) self._stats[self._REQUESTS_NUM] += 1 + self._check_open_getconn() + # Critical section: decide here if there's a connection ready # or if the client needs to wait. with self._lock: - self._check_open_getconn() conn = self._get_ready_connection(timeout) if not conn: # No connection available: put the client in the waiting queue @@ -285,7 +288,6 @@ class ConnectionPool(Generic[CT], BasePool): self._check_pool_putconn(conn) logger.info("returning connection to %r", self.name) - if self._maybe_close_connection(conn): return @@ -342,9 +344,7 @@ class ConnectionPool(Generic[CT], BasePool): def _start_workers(self) -> None: self._sched_runner = threading.Thread( - target=self._sched.run, - name=f"{self.name}-scheduler", - daemon=True, + target=self._sched.run, name=f"{self.name}-scheduler", daemon=True ) assert not self._workers for i in range(self.num_workers): @@ -544,8 +544,7 @@ class ConnectionPool(Generic[CT], BasePool): if isinstance(task, StopWorker): logger.debug( - "terminating working thread %s", - threading.current_thread().name, + "terminating working thread %s", threading.current_thread().name ) return @@ -569,9 +568,7 @@ class ConnectionPool(Generic[CT], BasePool): kwargs["connect_timeout"] = max(round(timeout), 1) t0 = monotonic() try: - conn: CT = self.connection_class.connect( # type: ignore - self.conninfo, **kwargs - ) + conn: CT = cast(CT, self.connection_class.connect(self.conninfo, **kwargs)) except Exception: self._stats[self._CONNECTIONS_ERRORS] += 1 raise diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index 9bd1766db..5daced6b3 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -143,6 +143,18 @@ class AsyncConnectionPool(Generic[ACT], BasePool): self._open() async def wait(self, timeout: float = 30.0) -> None: + """ + Wait for the pool to be full (with `min_size` connections) after creation. + + Close the pool, and raise `PoolTimeout`, if not ready within *timeout* + sec. + + Calling this method is not mandatory: you can try and use the pool + immediately after its creation. The first client will be served as soon + as a connection is ready. You can use this method if you prefer your + program to terminate in case the environment is not configured + properly, rather than trying to stay up the hardest it can. + """ self._check_open_getconn() async with self._lock: @@ -168,6 +180,17 @@ class AsyncConnectionPool(Generic[ACT], BasePool): @asynccontextmanager async def connection(self, timeout: Optional[float] = None) -> AsyncIterator[ACT]: + """Context manager to obtain a connection from the pool. + + Return the connection immediately if available, otherwise wait up to + *timeout* or `self.timeout` seconds and throw `PoolTimeout` if a + connection is not available in time. + + Upon context exit, return the connection to the pool. Apply the normal + :ref:`connection context behaviour ` (commit/rollback + the transaction in case of success/error). If the connection is no more + in working state, replace it with a new one. + """ conn = await self.getconn(timeout=timeout) try: t0 = monotonic() @@ -179,6 +202,15 @@ class AsyncConnectionPool(Generic[ACT], BasePool): self._stats[self._USAGE_MS] += int(1000.0 * (t1 - t0)) async def getconn(self, timeout: Optional[float] = None) -> ACT: + """Obtain a connection from the pool. + + You should preferably use `connection()`. Use this function only if + it is not possible to use the connection as context manager. + + After using this function you *must* call a corresponding `putconn()`: + failing to do so will deplete the pool. A depleted pool is a sad pool: + you don't want a depleted pool. + """ logger.info("connection requested from %r", self.name) self._stats[self._REQUESTS_NUM] += 1 @@ -220,6 +252,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): return conn async def _get_ready_connection(self, timeout: Optional[float]) -> Optional[ACT]: + """Return a connection, if the client deserves one.""" conn: Optional[ACT] = None if self._pool: # Take a connection ready out of the pool @@ -237,13 +270,20 @@ class AsyncConnectionPool(Generic[ACT], BasePool): def _maybe_grow_pool(self) -> None: # Allow only one task at time to grow the pool (or returning # connections might be starved). - if self._nconns < self._max_size and not self._growing: - self._nconns += 1 - logger.info("growing pool %r to %s", self.name, self._nconns) - self._growing = True - self.run_task(AddConnection(self, growing=True)) + if self._nconns >= self._max_size or self._growing: + return + self._nconns += 1 + logger.info("growing pool %r to %s", self.name, self._nconns) + self._growing = True + self.run_task(AddConnection(self, growing=True)) async def putconn(self, conn: ACT) -> None: + """Return a connection to the loving hands of its pool. + + Use this function only paired with a `getconn()`. You don't need to use + it if you use the much more comfortable `connection()` context manager. + """ + # Quick check to discard the wrong connection self._check_pool_putconn(conn) logger.info("returning connection to %r", self.name) @@ -257,6 +297,10 @@ class AsyncConnectionPool(Generic[ACT], BasePool): await self._return_connection(conn) async def _maybe_close_connection(self, conn: ACT) -> bool: + """Close a returned connection if necessary. + + Return `!True if the connection was closed. + """ # If the pool is closed just close the connection instead of returning # it to the pool. For extra refcare remove the pool reference from it. if not self._closed: @@ -267,6 +311,18 @@ class AsyncConnectionPool(Generic[ACT], BasePool): return True async def open(self, wait: bool = False, timeout: float = 30.0) -> None: + """Open the pool by starting connecting and and accepting clients. + + If *wait* is `!False`, return immediately and let the background worker + fill the pool if `min_size` > 0. Otherwise wait up to *timeout* seconds + for the requested number of connections to be ready (see `wait()` for + details). + + It is safe to call `!open()` again on a pool already open (because the + method was already called, or because the pool context was entered, or + because the pool was initialized with *open* = `!True`) but you cannot + currently re-open a closed pool. + """ # Make sure the lock is created after there is an event loop try: self._lock @@ -325,6 +381,16 @@ class AsyncConnectionPool(Generic[ACT], BasePool): self.run_task(Schedule(self, ShrinkPool(self), self.max_idle)) async def close(self, timeout: float = 5.0) -> None: + """Close the pool and make it unavailable to new clients. + + All the waiting and future clients will fail to acquire a connection + with a `PoolClosed` exception. Currently used connections will not be + closed until returned to the pool. + + Wait *timeout* seconds for threads to terminate their job, if positive. + If the timeout expires the pool is closed anyway, although it may raise + some warnings on exit. + """ if self._closed: return @@ -393,6 +459,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): await self.close() async def resize(self, min_size: int, max_size: Optional[int] = None) -> None: + """Change the size of the pool during runtime.""" min_size, max_size = self._check_size(min_size, max_size) ngrow = max(0, min_size - self._min_size) @@ -412,6 +479,11 @@ class AsyncConnectionPool(Generic[ACT], BasePool): self.run_task(AddConnection(self)) async def check(self) -> None: + """Verify the state of the connections currently in the pool. + + Test each connection: if it works return it to the pool, otherwise + dispose of it and create a new one. + """ async with self._lock: conns = list(self._pool) self._pool.clear() @@ -491,6 +563,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): ) async def _connect(self, timeout: Optional[float] = None) -> ACT: + """Return a new connection configured for the pool.""" self._stats[self._CONNECTIONS_NUM] += 1 kwargs = self.kwargs if timeout: @@ -498,7 +571,9 @@ class AsyncConnectionPool(Generic[ACT], BasePool): kwargs["connect_timeout"] = max(round(timeout), 1) t0 = monotonic() try: - conn: ACT = await self.connection_class.connect(self.conninfo, **kwargs) + conn: ACT = cast( + ACT, await self.connection_class.connect(self.conninfo, **kwargs) + ) except Exception: self._stats[self._CONNECTIONS_ERRORS] += 1 raise