self._waiting.append(pos)
# If there is space for the pool to grow, let's do it
- if self._nconns < self.maxconn:
+ if self._nconns < self._maxconn:
self._nconns += 1
logger.info(
"growing pool %r to %s", self.name, self._nconns
timeout,
)
+ async def resize(
+ self, minconn: int, maxconn: Optional[int] = None
+ ) -> None:
+ if maxconn is None:
+ maxconn = minconn
+ if maxconn < minconn:
+ raise ValueError("maxconn must be greater or equal than minconn")
+
+ ngrow = max(0, minconn - self._minconn)
+
+ logger.info(
+ "resizing %r to minconn=%s maxconn=%s", self.name, minconn, maxconn
+ )
+ async with self._lock:
+ self._minconn = minconn
+ self._maxconn = maxconn
+ self._nconns += ngrow
+
+ for i in range(ngrow):
+ self.run_task(tasks.AddConnection(self))
+
async def configure(self, conn: AsyncConnection) -> None:
"""Configure a connection after creation."""
if self._configure:
self._nconns_min = len(self._pool)
# If the pool can shrink and connections were unused, drop one
- if self._nconns > self.minconn and nconns_min > 0:
+ if self._nconns > self._minconn and nconns_min > 0:
to_close = self._pool.popleft()
self._nconns -= 1
if maxconn is None:
maxconn = minconn
if maxconn < minconn:
- raise ValueError(
- f"can't create {self.__class__.__name__}"
- f" with maxconn={maxconn} < minconn={minconn}"
- )
+ raise ValueError("maxconn must be greater or equal than minconn")
if not name:
num = BasePool._num_pool = BasePool._num_pool + 1
name = f"pool-{num}"
self._reconnect_failed: Callable[["BasePool[ConnectionType]"], None]
self._reconnect_failed = reconnect_failed or (lambda pool: None)
self.name = name
- self.minconn = minconn
- self.maxconn = maxconn
+ self._minconn = minconn
+ self._maxconn = maxconn
self.timeout = timeout
self.reconnect_timeout = reconnect_timeout
self.max_idle = max_idle
self.run_task(tasks.AddConnection(self))
# Schedule a task to shrink the pool if connections over minconn have
- # remained unused. However if the pool can't grow don't bother.
- if maxconn > minconn:
- self.schedule_task(tasks.ShrinkPool(self), self.max_idle)
+ # remained unused.
+ self.schedule_task(tasks.ShrinkPool(self), self.max_idle)
def __repr__(self) -> str:
return (
for i in range(len(self._workers)):
self.run_task(tasks.StopWorker(self))
+ @property
+ def minconn(self) -> int:
+ return self._minconn
+
+ @property
+ def maxconn(self) -> int:
+ return self._maxconn
+
@property
def closed(self) -> bool:
"""`!True` if the pool is closed."""
self._waiting.append(pos)
# If there is space for the pool to grow, let's do it
- if self._nconns < self.maxconn:
+ if self._nconns < self._maxconn:
self._nconns += 1
logger.info(
"growing pool %r to %s", self.name, self._nconns
timeout,
)
+ def resize(self, minconn: int, maxconn: Optional[int] = None) -> None:
+ if maxconn is None:
+ maxconn = minconn
+ if maxconn < minconn:
+ raise ValueError("maxconn must be greater or equal than minconn")
+
+ ngrow = max(0, minconn - self._minconn)
+
+ logger.info(
+ "resizing %r to minconn=%s maxconn=%s", self.name, minconn, maxconn
+ )
+ with self._lock:
+ self._minconn = minconn
+ self._maxconn = maxconn
+ self._nconns += ngrow
+
+ for i in range(ngrow):
+ self.run_task(tasks.AddConnection(self))
+
def configure(self, conn: Connection) -> None:
"""Configure a connection after creation."""
self._configure(conn)
self._nconns_min = len(self._pool)
# If the pool can shrink and connections were unused, drop one
- if self._nconns > self.minconn and nconns_min > 0:
+ if self._nconns > self._minconn and nconns_min > 0:
to_close = self._pool.popleft()
self._nconns -= 1
assert set(counts.values()) == set([2])
+@pytest.mark.slow
+def test_resize(dsn):
+ p = pool.ConnectionPool(dsn, minconn=2, max_idle=0.2)
+ size = []
+
+ def sampler():
+ sleep(0.05) # ensure sampling happens after shrink check
+ while True:
+ sleep(0.2)
+ if p.closed:
+ break
+ size.append(len(p._pool))
+
+ def client(t):
+ with p.connection() as conn:
+ conn.execute("select pg_sleep(%s)", [t])
+
+ s = Thread(target=sampler)
+ s.start()
+
+ sleep(0.3)
+
+ c = Thread(target=client, args=(0.4,))
+ c.start()
+
+ sleep(0.2)
+ p.resize(4)
+ assert p.minconn == 4
+ assert p.maxconn == 4
+
+ sleep(0.4)
+ p.resize(2)
+ assert p.minconn == 2
+ assert p.maxconn == 2
+
+ sleep(0.6)
+ p.close()
+ s.join()
+
+ assert size == [2, 1, 3, 4, 3, 2, 2]
+
+
def delay_connection(monkeypatch, sec):
"""
Return a _connect_gen function delayed by the amount of seconds
assert set(counts.values()) == set([2])
+@pytest.mark.slow
+async def test_resize(dsn):
+ p = pool.AsyncConnectionPool(dsn, minconn=2, max_idle=0.2)
+ size = []
+
+ async def sampler():
+ await asyncio.sleep(0.05) # ensure sampling happens after shrink check
+ while True:
+ await asyncio.sleep(0.2)
+ if p.closed:
+ break
+ size.append(len(p._pool))
+
+ async def client(t):
+ async with p.connection() as conn:
+ await conn.execute("select pg_sleep(%s)", [t])
+
+ s = create_task(sampler())
+
+ await asyncio.sleep(0.3)
+
+ c = create_task(client(0.4))
+
+ await asyncio.sleep(0.2)
+ await p.resize(4)
+ assert p.minconn == 4
+ assert p.maxconn == 4
+
+ await asyncio.sleep(0.4)
+ await p.resize(2)
+ assert p.minconn == 2
+ assert p.maxconn == 2
+
+ await asyncio.sleep(0.6)
+ await p.close()
+ await asyncio.gather(s, c)
+
+ assert size == [2, 1, 3, 4, 3, 2, 2]
+
+
def delay_connection(monkeypatch, sec):
"""
Return a _connect_gen function delayed by the amount of seconds