]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Add pool.resize()
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 27 Feb 2021 20:07:38 +0000 (21:07 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 12 Mar 2021 04:07:25 +0000 (05:07 +0100)
psycopg3/psycopg3/pool/async_pool.py
psycopg3/psycopg3/pool/base.py
psycopg3/psycopg3/pool/pool.py
tests/pool/test_pool.py
tests/pool/test_pool_async.py

index 45a5bbf7cd414e6ef90f6c8d795095212cf2ccd4..c1e34add87fbf5635d9f10405906cfb5557c2736 100644 (file)
@@ -128,7 +128,7 @@ class AsyncConnectionPool(BasePool[AsyncConnection]):
                 self._waiting.append(pos)
 
                 # If there is space for the pool to grow, let's do it
-                if self._nconns < self.maxconn:
+                if self._nconns < self._maxconn:
                     self._nconns += 1
                     logger.info(
                         "growing pool %r to %s", self.name, self._nconns
@@ -233,6 +233,27 @@ class AsyncConnectionPool(BasePool[AsyncConnection]):
                         timeout,
                     )
 
+    async def resize(
+        self, minconn: int, maxconn: Optional[int] = None
+    ) -> None:
+        if maxconn is None:
+            maxconn = minconn
+        if maxconn < minconn:
+            raise ValueError("maxconn must be greater or equal than minconn")
+
+        ngrow = max(0, minconn - self._minconn)
+
+        logger.info(
+            "resizing %r to minconn=%s maxconn=%s", self.name, minconn, maxconn
+        )
+        async with self._lock:
+            self._minconn = minconn
+            self._maxconn = maxconn
+            self._nconns += ngrow
+
+        for i in range(ngrow):
+            self.run_task(tasks.AddConnection(self))
+
     async def configure(self, conn: AsyncConnection) -> None:
         """Configure a connection after creation."""
         if self._configure:
@@ -370,7 +391,7 @@ class AsyncConnectionPool(BasePool[AsyncConnection]):
             self._nconns_min = len(self._pool)
 
             # If the pool can shrink and connections were unused, drop one
-            if self._nconns > self.minconn and nconns_min > 0:
+            if self._nconns > self._minconn and nconns_min > 0:
                 to_close = self._pool.popleft()
                 self._nconns -= 1
 
index 7397025ef25e4bef85d11da44040d4f1c918b48d..c999ac11371364aec3a4cbafaf2ccb7409eb428a 100644 (file)
@@ -45,10 +45,7 @@ class BasePool(Generic[ConnectionType]):
         if maxconn is None:
             maxconn = minconn
         if maxconn < minconn:
-            raise ValueError(
-                f"can't create {self.__class__.__name__}"
-                f" with maxconn={maxconn} < minconn={minconn}"
-            )
+            raise ValueError("maxconn must be greater or equal than minconn")
         if not name:
             num = BasePool._num_pool = BasePool._num_pool + 1
             name = f"pool-{num}"
@@ -61,8 +58,8 @@ class BasePool(Generic[ConnectionType]):
         self._reconnect_failed: Callable[["BasePool[ConnectionType]"], None]
         self._reconnect_failed = reconnect_failed or (lambda pool: None)
         self.name = name
-        self.minconn = minconn
-        self.maxconn = maxconn
+        self._minconn = minconn
+        self._maxconn = maxconn
         self.timeout = timeout
         self.reconnect_timeout = reconnect_timeout
         self.max_idle = max_idle
@@ -108,9 +105,8 @@ class BasePool(Generic[ConnectionType]):
             self.run_task(tasks.AddConnection(self))
 
         # Schedule a task to shrink the pool if connections over minconn have
-        # remained unused. However if the pool can't grow don't bother.
-        if maxconn > minconn:
-            self.schedule_task(tasks.ShrinkPool(self), self.max_idle)
+        # remained unused.
+        self.schedule_task(tasks.ShrinkPool(self), self.max_idle)
 
     def __repr__(self) -> str:
         return (
@@ -135,6 +131,14 @@ class BasePool(Generic[ConnectionType]):
         for i in range(len(self._workers)):
             self.run_task(tasks.StopWorker(self))
 
+    @property
+    def minconn(self) -> int:
+        return self._minconn
+
+    @property
+    def maxconn(self) -> int:
+        return self._maxconn
+
     @property
     def closed(self) -> bool:
         """`!True` if the pool is closed."""
index b6d0e9257d288bcbd0d3d8d537e4e700ffd931a4..c68a8615ec0a0e76e26ddd12849e614305357fe4 100644 (file)
@@ -111,7 +111,7 @@ class ConnectionPool(BasePool[Connection]):
                 self._waiting.append(pos)
 
                 # If there is space for the pool to grow, let's do it
-                if self._nconns < self.maxconn:
+                if self._nconns < self._maxconn:
                     self._nconns += 1
                     logger.info(
                         "growing pool %r to %s", self.name, self._nconns
@@ -215,6 +215,25 @@ class ConnectionPool(BasePool[Connection]):
                         timeout,
                     )
 
+    def resize(self, minconn: int, maxconn: Optional[int] = None) -> None:
+        if maxconn is None:
+            maxconn = minconn
+        if maxconn < minconn:
+            raise ValueError("maxconn must be greater or equal than minconn")
+
+        ngrow = max(0, minconn - self._minconn)
+
+        logger.info(
+            "resizing %r to minconn=%s maxconn=%s", self.name, minconn, maxconn
+        )
+        with self._lock:
+            self._minconn = minconn
+            self._maxconn = maxconn
+            self._nconns += ngrow
+
+        for i in range(ngrow):
+            self.run_task(tasks.AddConnection(self))
+
     def configure(self, conn: Connection) -> None:
         """Configure a connection after creation."""
         self._configure(conn)
@@ -349,7 +368,7 @@ class ConnectionPool(BasePool[Connection]):
             self._nconns_min = len(self._pool)
 
             # If the pool can shrink and connections were unused, drop one
-            if self._nconns > self.minconn and nconns_min > 0:
+            if self._nconns > self._minconn and nconns_min > 0:
                 to_close = self._pool.popleft()
                 self._nconns -= 1
 
index bb48f77c14380dfe7244126fdc39c41475d499c1..840b3b1e265be9dc31b1f14a1f556c3f633ae6e9 100644 (file)
@@ -608,6 +608,48 @@ def test_uniform_use(dsn):
     assert set(counts.values()) == set([2])
 
 
+@pytest.mark.slow
+def test_resize(dsn):
+    p = pool.ConnectionPool(dsn, minconn=2, max_idle=0.2)
+    size = []
+
+    def sampler():
+        sleep(0.05)  # ensure sampling happens after shrink check
+        while True:
+            sleep(0.2)
+            if p.closed:
+                break
+            size.append(len(p._pool))
+
+    def client(t):
+        with p.connection() as conn:
+            conn.execute("select pg_sleep(%s)", [t])
+
+    s = Thread(target=sampler)
+    s.start()
+
+    sleep(0.3)
+
+    c = Thread(target=client, args=(0.4,))
+    c.start()
+
+    sleep(0.2)
+    p.resize(4)
+    assert p.minconn == 4
+    assert p.maxconn == 4
+
+    sleep(0.4)
+    p.resize(2)
+    assert p.minconn == 2
+    assert p.maxconn == 2
+
+    sleep(0.6)
+    p.close()
+    s.join()
+
+    assert size == [2, 1, 3, 4, 3, 2, 2]
+
+
 def delay_connection(monkeypatch, sec):
     """
     Return a _connect_gen function delayed by the amount of seconds
index 5739c69f7753ec6e75ea84333fc9e5342361e238..71542f817601f4f4fed9a1fd77a896a00942de7d 100644 (file)
@@ -641,6 +641,46 @@ async def test_uniform_use(dsn):
     assert set(counts.values()) == set([2])
 
 
+@pytest.mark.slow
+async def test_resize(dsn):
+    p = pool.AsyncConnectionPool(dsn, minconn=2, max_idle=0.2)
+    size = []
+
+    async def sampler():
+        await asyncio.sleep(0.05)  # ensure sampling happens after shrink check
+        while True:
+            await asyncio.sleep(0.2)
+            if p.closed:
+                break
+            size.append(len(p._pool))
+
+    async def client(t):
+        async with p.connection() as conn:
+            await conn.execute("select pg_sleep(%s)", [t])
+
+    s = create_task(sampler())
+
+    await asyncio.sleep(0.3)
+
+    c = create_task(client(0.4))
+
+    await asyncio.sleep(0.2)
+    await p.resize(4)
+    assert p.minconn == 4
+    assert p.maxconn == 4
+
+    await asyncio.sleep(0.4)
+    await p.resize(2)
+    assert p.minconn == 2
+    assert p.maxconn == 2
+
+    await asyncio.sleep(0.6)
+    await p.close()
+    await asyncio.gather(s, c)
+
+    assert size == [2, 1, 3, 4, 3, 2, 2]
+
+
 def delay_connection(monkeypatch, sec):
     """
     Return a _connect_gen function delayed by the amount of seconds