]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
refactor(pool): light refactor to align async and sync pool
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 1 Oct 2023 01:59:37 +0000 (03:59 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 11 Oct 2023 21:45:38 +0000 (23:45 +0200)
psycopg_pool/psycopg_pool/pool.py
psycopg_pool/psycopg_pool/pool_async.py

index fca93724ba9beb0a0be0985b1d8bc8de51e67a4a..6e5876eb73c46e64ff76a3ae336496b3334f4b26 100644 (file)
@@ -169,7 +169,9 @@ class ConnectionPool(Generic[CT], BasePool):
         logger.info("waiting for pool %r initialization", self.name)
         if not self._pool_full_event.wait(timeout):
             self.close()  # stop all the threads
-            raise PoolTimeout(f"pool initialization incomplete after {timeout} sec")
+            raise PoolTimeout(
+                f"pool initialization incomplete after {timeout} sec"
+            ) from None
 
         with self._lock:
             assert self._pool_full_event
@@ -213,10 +215,11 @@ class ConnectionPool(Generic[CT], BasePool):
         logger.info("connection requested from %r", self.name)
         self._stats[self._REQUESTS_NUM] += 1
 
+        self._check_open_getconn()
+
         # Critical section: decide here if there's a connection ready
         # or if the client needs to wait.
         with self._lock:
-            self._check_open_getconn()
             conn = self._get_ready_connection(timeout)
             if not conn:
                 # No connection available: put the client in the waiting queue
@@ -285,7 +288,6 @@ class ConnectionPool(Generic[CT], BasePool):
         self._check_pool_putconn(conn)
 
         logger.info("returning connection to %r", self.name)
-
         if self._maybe_close_connection(conn):
             return
 
@@ -342,9 +344,7 @@ class ConnectionPool(Generic[CT], BasePool):
 
     def _start_workers(self) -> None:
         self._sched_runner = threading.Thread(
-            target=self._sched.run,
-            name=f"{self.name}-scheduler",
-            daemon=True,
+            target=self._sched.run, name=f"{self.name}-scheduler", daemon=True
         )
         assert not self._workers
         for i in range(self.num_workers):
@@ -544,8 +544,7 @@ class ConnectionPool(Generic[CT], BasePool):
 
             if isinstance(task, StopWorker):
                 logger.debug(
-                    "terminating working thread %s",
-                    threading.current_thread().name,
+                    "terminating working thread %s", threading.current_thread().name
                 )
                 return
 
@@ -569,9 +568,7 @@ class ConnectionPool(Generic[CT], BasePool):
             kwargs["connect_timeout"] = max(round(timeout), 1)
         t0 = monotonic()
         try:
-            conn: CT = self.connection_class.connect(  # type: ignore
-                self.conninfo, **kwargs
-            )
+            conn: CT = cast(CT, self.connection_class.connect(self.conninfo, **kwargs))
         except Exception:
             self._stats[self._CONNECTIONS_ERRORS] += 1
             raise
index 9bd1766dbfb5e787c788e3e8da4e1a756f6493f6..5daced6b32edaffdc5ddebcef34d524c1e5c26ea 100644 (file)
@@ -143,6 +143,18 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             self._open()
 
     async def wait(self, timeout: float = 30.0) -> None:
+        """
+        Wait for the pool to be full (with `min_size` connections) after creation.
+
+        Close the pool, and raise `PoolTimeout`, if not ready within *timeout*
+        sec.
+
+        Calling this method is not mandatory: you can try and use the pool
+        immediately after its creation. The first client will be served as soon
+        as a connection is ready. You can use this method if you prefer your
+        program to terminate in case the environment is not configured
+        properly, rather than trying to stay up the hardest it can.
+        """
         self._check_open_getconn()
 
         async with self._lock:
@@ -168,6 +180,17 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
 
     @asynccontextmanager
     async def connection(self, timeout: Optional[float] = None) -> AsyncIterator[ACT]:
+        """Context manager to obtain a connection from the pool.
+
+        Return the connection immediately if available, otherwise wait up to
+        *timeout* or `self.timeout` seconds and throw `PoolTimeout` if a
+        connection is not available in time.
+
+        Upon context exit, return the connection to the pool. Apply the normal
+        :ref:`connection context behaviour <with-connection>` (commit/rollback
+        the transaction in case of success/error). If the connection is no more
+        in working state, replace it with a new one.
+        """
         conn = await self.getconn(timeout=timeout)
         try:
             t0 = monotonic()
@@ -179,6 +202,15 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             self._stats[self._USAGE_MS] += int(1000.0 * (t1 - t0))
 
     async def getconn(self, timeout: Optional[float] = None) -> ACT:
+        """Obtain a connection from the pool.
+
+        You should preferably use `connection()`. Use this function only if
+        it is not possible to use the connection as context manager.
+
+        After using this function you *must* call a corresponding `putconn()`:
+        failing to do so will deplete the pool. A depleted pool is a sad pool:
+        you don't want a depleted pool.
+        """
         logger.info("connection requested from %r", self.name)
         self._stats[self._REQUESTS_NUM] += 1
 
@@ -220,6 +252,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         return conn
 
     async def _get_ready_connection(self, timeout: Optional[float]) -> Optional[ACT]:
+        """Return a connection, if the client deserves one."""
         conn: Optional[ACT] = None
         if self._pool:
             # Take a connection ready out of the pool
@@ -237,13 +270,20 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
     def _maybe_grow_pool(self) -> None:
         # Allow only one task at time to grow the pool (or returning
         # connections might be starved).
-        if self._nconns < self._max_size and not self._growing:
-            self._nconns += 1
-            logger.info("growing pool %r to %s", self.name, self._nconns)
-            self._growing = True
-            self.run_task(AddConnection(self, growing=True))
+        if self._nconns >= self._max_size or self._growing:
+            return
+        self._nconns += 1
+        logger.info("growing pool %r to %s", self.name, self._nconns)
+        self._growing = True
+        self.run_task(AddConnection(self, growing=True))
 
     async def putconn(self, conn: ACT) -> None:
+        """Return a connection to the loving hands of its pool.
+
+        Use this function only paired with a `getconn()`. You don't need to use
+        it if you use the much more comfortable `connection()` context manager.
+        """
+        # Quick check to discard the wrong connection
         self._check_pool_putconn(conn)
 
         logger.info("returning connection to %r", self.name)
@@ -257,6 +297,10 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             await self._return_connection(conn)
 
     async def _maybe_close_connection(self, conn: ACT) -> bool:
+        """Close a returned connection if necessary.
+
+        Return `!True if the connection was closed.
+        """
         # If the pool is closed just close the connection instead of returning
         # it to the pool. For extra refcare remove the pool reference from it.
         if not self._closed:
@@ -267,6 +311,18 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         return True
 
     async def open(self, wait: bool = False, timeout: float = 30.0) -> None:
+        """Open the pool by starting connecting and and accepting clients.
+
+        If *wait* is `!False`, return immediately and let the background worker
+        fill the pool if `min_size` > 0. Otherwise wait up to *timeout* seconds
+        for the requested number of connections to be ready (see `wait()` for
+        details).
+
+        It is safe to call `!open()` again on a pool already open (because the
+        method was already called, or because the pool context was entered, or
+        because the pool was initialized with *open* = `!True`) but you cannot
+        currently re-open a closed pool.
+        """
         # Make sure the lock is created after there is an event loop
         try:
             self._lock
@@ -325,6 +381,16 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         self.run_task(Schedule(self, ShrinkPool(self), self.max_idle))
 
     async def close(self, timeout: float = 5.0) -> None:
+        """Close the pool and make it unavailable to new clients.
+
+        All the waiting and future clients will fail to acquire a connection
+        with a `PoolClosed` exception. Currently used connections will not be
+        closed until returned to the pool.
+
+        Wait *timeout* seconds for threads to terminate their job, if positive.
+        If the timeout expires the pool is closed anyway, although it may raise
+        some warnings on exit.
+        """
         if self._closed:
             return
 
@@ -393,6 +459,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         await self.close()
 
     async def resize(self, min_size: int, max_size: Optional[int] = None) -> None:
+        """Change the size of the pool during runtime."""
         min_size, max_size = self._check_size(min_size, max_size)
 
         ngrow = max(0, min_size - self._min_size)
@@ -412,6 +479,11 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             self.run_task(AddConnection(self))
 
     async def check(self) -> None:
+        """Verify the state of the connections currently in the pool.
+
+        Test each connection: if it works return it to the pool, otherwise
+        dispose of it and create a new one.
+        """
         async with self._lock:
             conns = list(self._pool)
             self._pool.clear()
@@ -491,6 +563,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
                 )
 
     async def _connect(self, timeout: Optional[float] = None) -> ACT:
+        """Return a new connection configured for the pool."""
         self._stats[self._CONNECTIONS_NUM] += 1
         kwargs = self.kwargs
         if timeout:
@@ -498,7 +571,9 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             kwargs["connect_timeout"] = max(round(timeout), 1)
         t0 = monotonic()
         try:
-            conn: ACT = await self.connection_class.connect(self.conninfo, **kwargs)
+            conn: ACT = cast(
+                ACT, await self.connection_class.connect(self.conninfo, **kwargs)
+            )
         except Exception:
             self._stats[self._CONNECTIONS_ERRORS] += 1
             raise