self._waiting.append(pos)
self._stats[self._REQUESTS_QUEUED] += 1
- # If there is space for the pool to grow, let's do it
- if self._nconns < self._maxconn:
+ # Allow only one thread at time to grow the pool (or returning
+ # connections might be starved).
+ if self._nconns < self._maxconn and not self._growing:
self._nconns += 1
logger.info(
"growing pool %r to %s", self.name, self._nconns
)
- self.run_task(AddConnection(self))
+ self._growing = True
+ self.run_task(AddConnection(self, growing=True))
# If we are in the waiting queue, wait to be assigned a connection
# (outside the critical section, so only the waiting client is locked)
return conn
async def _add_connection(
- self, attempt: Optional[ConnectionAttempt]
+ self, attempt: Optional[ConnectionAttempt], growing: bool = False
) -> None:
"""Try to connect and add the connection to the pool.
else:
attempt.update_delay(now)
await self.schedule_task(
- AddConnection(self, attempt), attempt.delay
+ AddConnection(self, attempt, growing=growing),
+ attempt.delay,
)
- else:
- await self._add_to_pool(conn)
+ return
+
+ logger.info("adding new connection to the pool")
+ await self._add_to_pool(conn)
+ if growing:
+ async with self._lock:
+ if self._nconns < self._maxconn and self._waiting:
+ self._nconns += 1
+ logger.info(
+ "growing pool %r to %s", self.name, self._nconns
+ )
+ self.run_task(AddConnection(self, growing=True))
+ else:
+ self._growing = False
async def _return_connection(self, conn: AsyncConnection) -> None:
"""
self,
pool: "AsyncConnectionPool",
attempt: Optional["ConnectionAttempt"] = None,
+ growing: bool = False,
):
super().__init__(pool)
self.attempt = attempt
+ self.growing = growing
async def _run(self, pool: "AsyncConnectionPool") -> None:
- await pool._add_connection(self.attempt)
+ await pool._add_connection(self.attempt, growing=self.growing)
class ReturnConnection(MaintenanceTask):
# max_idle interval they weren't all used.
self._nconns_min = minconn
+ # Flag to allow the pool to grow only one connection at time. In case
+ # of spike, if threads are allowed to grow in parallel and connection
+ # time is slow, there won't be any thread available to return the
+ # connections to the pool.
+ self._growing = False
+
# _close should be the last property to be set in the state
# to avoid warning on __del__ in case __init__ fails.
self._closed = False
self._stats[self._REQUESTS_QUEUED] += 1
# If there is space for the pool to grow, let's do it
- if self._nconns < self._maxconn:
+ # Allow only one thread at time to grow the pool (or returning
+ # connections might be starved).
+ if self._nconns < self._maxconn and not self._growing:
self._nconns += 1
logger.info(
"growing pool %r to %s", self.name, self._nconns
)
- self.run_task(AddConnection(self))
+ self._growing = True
+ self.run_task(AddConnection(self, growing=True))
# If we are in the waiting queue, wait to be assigned a connection
# (outside the critical section, so only the waiting client is locked)
)
return conn
- def _add_connection(self, attempt: Optional[ConnectionAttempt]) -> None:
+ def _add_connection(
+ self, attempt: Optional[ConnectionAttempt], growing: bool = False
+ ) -> None:
"""Try to connect and add the connection to the pool.
If failed, reschedule a new attempt in the future for a few times, then
self.reconnect_failed()
else:
attempt.update_delay(now)
- self.schedule_task(AddConnection(self, attempt), attempt.delay)
- else:
- self._add_to_pool(conn)
+ self.schedule_task(
+ AddConnection(self, attempt, growing=growing),
+ attempt.delay,
+ )
+ return
+
+ logger.info("adding new connection to the pool")
+ self._add_to_pool(conn)
+ if growing:
+ with self._lock:
+ if self._nconns < self._maxconn and self._waiting:
+ self._nconns += 1
+ logger.info(
+ "growing pool %r to %s", self.name, self._nconns
+ )
+ self.run_task(AddConnection(self, growing=True))
+ else:
+ self._growing = False
def _return_connection(self, conn: Connection) -> None:
"""
self,
pool: "ConnectionPool",
attempt: Optional["ConnectionAttempt"] = None,
+ growing: bool = False,
):
super().__init__(pool)
self.attempt = attempt
+ self.growing = growing
def _run(self, pool: "ConnectionPool") -> None:
- pool._add_connection(self.attempt)
+ pool._add_connection(self.attempt, growing=self.growing)
class ReturnConnection(MaintenanceTask):
[t.start() for t in ts]
[t.join() for t in ts]
- want_times = [0.2, 0.2, 0.3, 0.3, 0.4, 0.4]
+ want_times = [0.2, 0.2, 0.3, 0.4, 0.4, 0.4]
times = [item[1] for item in results]
for got, want in zip(times, want_times):
assert got == pytest.approx(want, 0.1), times
assert stats["connections_lost"] == 3
+@pytest.mark.slow
+def test_spike(dsn, monkeypatch):
+ # Inspired to https://github.com/brettwooldridge/HikariCP/blob/dev/
+ # documents/Welcome-To-The-Jungle.md
+ delay_connection(monkeypatch, 0.15)
+
+ def worker():
+ with p.connection():
+ sleep(0.002)
+
+ with pool.ConnectionPool(dsn, minconn=5, maxconn=10) as p:
+ p.wait()
+
+ ts = [Thread(target=worker) for i in range(50)]
+ [t.start() for t in ts]
+ [t.join() for t in ts]
+ p.wait()
+
+ assert len(p._pool) < 7
+
+
def delay_connection(monkeypatch, sec):
"""
Return a _connect_gen function delayed by the amount of seconds
ts = [create_task(worker(i)) for i in range(6)]
await asyncio.gather(*ts)
- want_times = [0.2, 0.2, 0.3, 0.3, 0.4, 0.4]
+ want_times = [0.2, 0.2, 0.3, 0.4, 0.4, 0.4]
times = [item[1] for item in results]
for got, want in zip(times, want_times):
assert got == pytest.approx(want, 0.1), times
assert stats["connections_lost"] == 3
+@pytest.mark.slow
+async def test_spike(dsn, monkeypatch):
+ # Inspired to https://github.com/brettwooldridge/HikariCP/blob/dev/
+ # documents/Welcome-To-The-Jungle.md
+ delay_connection(monkeypatch, 0.15)
+
+ async def worker():
+ async with p.connection():
+ await asyncio.sleep(0.002)
+
+ async with pool.AsyncConnectionPool(dsn, minconn=5, maxconn=10) as p:
+ await p.wait()
+
+ ts = [create_task(worker()) for i in range(50)]
+ await asyncio.gather(*ts)
+ await p.wait()
+
+ assert len(p._pool) < 7
+
+
def delay_connection(monkeypatch, sec):
"""
Return a _connect_gen function delayed by the amount of seconds