]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Grow the pool size one connection at time
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Thu, 11 Mar 2021 20:11:48 +0000 (21:11 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 12 Mar 2021 04:07:25 +0000 (05:07 +0100)
Not doing so saturates the worker processes which may not be able to do
other maintenance, including returning connections to the pool. It also
helps to limit the amount of growth in a sudden spike.
o

psycopg3/psycopg3/pool/async_pool.py
psycopg3/psycopg3/pool/base.py
psycopg3/psycopg3/pool/pool.py
tests/pool/test_pool.py
tests/pool/test_pool_async.py

index dbda524a5c969df048ed5d646b4a1d30342323c1..9f5b1cb26f1935ef891ef1ef8ce3cfb2f4e7d82d 100644 (file)
@@ -164,13 +164,15 @@ class AsyncConnectionPool(BasePool[AsyncConnection]):
                 self._waiting.append(pos)
                 self._stats[self._REQUESTS_QUEUED] += 1
 
-                # If there is space for the pool to grow, let's do it
-                if self._nconns < self._maxconn:
+                # Allow only one thread at time to grow the pool (or returning
+                # connections might be starved).
+                if self._nconns < self._maxconn and not self._growing:
                     self._nconns += 1
                     logger.info(
                         "growing pool %r to %s", self.name, self._nconns
                     )
-                    self.run_task(AddConnection(self))
+                    self._growing = True
+                    self.run_task(AddConnection(self, growing=True))
 
         # If we are in the waiting queue, wait to be assigned a connection
         # (outside the critical section, so only the waiting client is locked)
@@ -408,7 +410,7 @@ class AsyncConnectionPool(BasePool[AsyncConnection]):
         return conn
 
     async def _add_connection(
-        self, attempt: Optional[ConnectionAttempt]
+        self, attempt: Optional[ConnectionAttempt], growing: bool = False
     ) -> None:
         """Try to connect and add the connection to the pool.
 
@@ -439,10 +441,23 @@ class AsyncConnectionPool(BasePool[AsyncConnection]):
             else:
                 attempt.update_delay(now)
                 await self.schedule_task(
-                    AddConnection(self, attempt), attempt.delay
+                    AddConnection(self, attempt, growing=growing),
+                    attempt.delay,
                 )
-        else:
-            await self._add_to_pool(conn)
+            return
+
+        logger.info("adding new connection to the pool")
+        await self._add_to_pool(conn)
+        if growing:
+            async with self._lock:
+                if self._nconns < self._maxconn and self._waiting:
+                    self._nconns += 1
+                    logger.info(
+                        "growing pool %r to %s", self.name, self._nconns
+                    )
+                    self.run_task(AddConnection(self, growing=True))
+                else:
+                    self._growing = False
 
     async def _return_connection(self, conn: AsyncConnection) -> None:
         """
@@ -690,12 +705,14 @@ class AddConnection(MaintenanceTask):
         self,
         pool: "AsyncConnectionPool",
         attempt: Optional["ConnectionAttempt"] = None,
+        growing: bool = False,
     ):
         super().__init__(pool)
         self.attempt = attempt
+        self.growing = growing
 
     async def _run(self, pool: "AsyncConnectionPool") -> None:
-        await pool._add_connection(self.attempt)
+        await pool._add_connection(self.attempt, growing=self.growing)
 
 
 class ReturnConnection(MaintenanceTask):
index b4c94f72c53406b1406501c797c92f0c9a95aaff..4fba7862e4781a7f7bd60d19a9d9b366a958ed7a 100644 (file)
@@ -93,6 +93,12 @@ class BasePool(Generic[ConnectionType]):
         # max_idle interval they weren't all used.
         self._nconns_min = minconn
 
+        # Flag to allow the pool to grow only one connection at time. In case
+        # of spike, if threads are allowed to grow in parallel and connection
+        # time is slow, there won't be any thread available to return the
+        # connections to the pool.
+        self._growing = False
+
         # _close should be the last property to be set in the state
         # to avoid warning on __del__ in case __init__ fails.
         self._closed = False
index a7d6c90ec10fcf8b476f117df332472414362aad..85e07f003cf6848fdb3804057f8ff8fe566c91df 100644 (file)
@@ -176,12 +176,15 @@ class ConnectionPool(BasePool[Connection]):
                 self._stats[self._REQUESTS_QUEUED] += 1
 
                 # If there is space for the pool to grow, let's do it
-                if self._nconns < self._maxconn:
+                # Allow only one thread at time to grow the pool (or returning
+                # connections might be starved).
+                if self._nconns < self._maxconn and not self._growing:
                     self._nconns += 1
                     logger.info(
                         "growing pool %r to %s", self.name, self._nconns
                     )
-                    self.run_task(AddConnection(self))
+                    self._growing = True
+                    self.run_task(AddConnection(self, growing=True))
 
         # If we are in the waiting queue, wait to be assigned a connection
         # (outside the critical section, so only the waiting client is locked)
@@ -424,7 +427,9 @@ class ConnectionPool(BasePool[Connection]):
         )
         return conn
 
-    def _add_connection(self, attempt: Optional[ConnectionAttempt]) -> None:
+    def _add_connection(
+        self, attempt: Optional[ConnectionAttempt], growing: bool = False
+    ) -> None:
         """Try to connect and add the connection to the pool.
 
         If failed, reschedule a new attempt in the future for a few times, then
@@ -453,9 +458,24 @@ class ConnectionPool(BasePool[Connection]):
                 self.reconnect_failed()
             else:
                 attempt.update_delay(now)
-                self.schedule_task(AddConnection(self, attempt), attempt.delay)
-        else:
-            self._add_to_pool(conn)
+                self.schedule_task(
+                    AddConnection(self, attempt, growing=growing),
+                    attempt.delay,
+                )
+            return
+
+        logger.info("adding new connection to the pool")
+        self._add_to_pool(conn)
+        if growing:
+            with self._lock:
+                if self._nconns < self._maxconn and self._waiting:
+                    self._nconns += 1
+                    logger.info(
+                        "growing pool %r to %s", self.name, self._nconns
+                    )
+                    self.run_task(AddConnection(self, growing=True))
+                else:
+                    self._growing = False
 
     def _return_connection(self, conn: Connection) -> None:
         """
@@ -707,12 +727,14 @@ class AddConnection(MaintenanceTask):
         self,
         pool: "ConnectionPool",
         attempt: Optional["ConnectionAttempt"] = None,
+        growing: bool = False,
     ):
         super().__init__(pool)
         self.attempt = attempt
+        self.growing = growing
 
     def _run(self, pool: "ConnectionPool") -> None:
-        pool._add_connection(self.attempt)
+        pool._add_connection(self.attempt, growing=self.growing)
 
 
 class ReturnConnection(MaintenanceTask):
index ec71b04ad9fd5994ddd992c4924f018946a2a139..8be90ca3710afa95ac0939b5c6149756829790da 100644 (file)
@@ -609,7 +609,7 @@ def test_grow(dsn, monkeypatch, retries):
                 [t.start() for t in ts]
                 [t.join() for t in ts]
 
-            want_times = [0.2, 0.2, 0.3, 0.3, 0.4, 0.4]
+            want_times = [0.2, 0.2, 0.3, 0.4, 0.4, 0.4]
             times = [item[1] for item in results]
             for got, want in zip(times, want_times):
                 assert got == pytest.approx(want, 0.1), times
@@ -921,6 +921,27 @@ def test_stats_connect(dsn, proxy, monkeypatch):
         assert stats["connections_lost"] == 3
 
 
+@pytest.mark.slow
+def test_spike(dsn, monkeypatch):
+    # Inspired to https://github.com/brettwooldridge/HikariCP/blob/dev/
+    # documents/Welcome-To-The-Jungle.md
+    delay_connection(monkeypatch, 0.15)
+
+    def worker():
+        with p.connection():
+            sleep(0.002)
+
+    with pool.ConnectionPool(dsn, minconn=5, maxconn=10) as p:
+        p.wait()
+
+        ts = [Thread(target=worker) for i in range(50)]
+        [t.start() for t in ts]
+        [t.join() for t in ts]
+        p.wait()
+
+        assert len(p._pool) < 7
+
+
 def delay_connection(monkeypatch, sec):
     """
     Return a _connect_gen function delayed by the amount of seconds
index 95404af551ae68a1e3a2b1e3f7680be803bf3180..59159496a17bc00e08c7fa9c645abd7000cd000e 100644 (file)
@@ -615,7 +615,7 @@ async def test_grow(dsn, monkeypatch, retries):
                 ts = [create_task(worker(i)) for i in range(6)]
                 await asyncio.gather(*ts)
 
-            want_times = [0.2, 0.2, 0.3, 0.3, 0.4, 0.4]
+            want_times = [0.2, 0.2, 0.3, 0.4, 0.4, 0.4]
             times = [item[1] for item in results]
             for got, want in zip(times, want_times):
                 assert got == pytest.approx(want, 0.1), times
@@ -920,6 +920,26 @@ async def test_stats_connect(dsn, proxy, monkeypatch):
         assert stats["connections_lost"] == 3
 
 
+@pytest.mark.slow
+async def test_spike(dsn, monkeypatch):
+    # Inspired to https://github.com/brettwooldridge/HikariCP/blob/dev/
+    # documents/Welcome-To-The-Jungle.md
+    delay_connection(monkeypatch, 0.15)
+
+    async def worker():
+        async with p.connection():
+            await asyncio.sleep(0.002)
+
+    async with pool.AsyncConnectionPool(dsn, minconn=5, maxconn=10) as p:
+        await p.wait()
+
+        ts = [create_task(worker()) for i in range(50)]
+        await asyncio.gather(*ts)
+        await p.wait()
+
+        assert len(p._pool) < 7
+
+
 def delay_connection(monkeypatch, sec):
     """
     Return a _connect_gen function delayed by the amount of seconds