_CONNECTIONS_LOST = "connections_lost"
_pool: deque[Any]
+ _given: dict[int, Any]
def __init__(
self,
self._nconns = min_size # currently in the pool, out, being prepared
self._pool = deque()
+ self._given = {}
self._stats = Counter[str]()
# Min number of connections in the pool in a max_idle unit of time.
class ConnectionPool(Generic[CT], BasePool):
_pool: deque[CT]
+ _given: dict[int, CT]
def __init__(
self,
if self._pool:
# Take a connection ready out of the pool
conn = self._pool.popleft()
+ self._given[id(conn)] = conn
if len(self._pool) < self._nconns_min:
self._nconns_min = len(self._pool)
elif self.max_waiting and len(self._waiting) >= self.max_waiting:
self._putconn(conn, from_getconn=False)
+ def drain(self) -> None:
+ """
+ Remove all the connections from the pool and create new ones.
+
+ If a connection is currently out of the pool it will be closed when
+ returned to the pool and replaced with a new one.
+
+ This method is useful to force a connection re-configuration, for
+ example when the adapters map changes after the pool was created.
+ """
+ with self._lock:
+ conns = list(self._pool)
+ self._pool.clear()
+
+ # Mark the currently given connections as already expired,
+ # so they will be closed as soon as returned.
+ earlier = monotonic() - 1.0
+ for conn in self._given.values():
+ conn._expire_at = earlier
+
+ # Close the connection already in the pool, open new ones.
+ for conn in conns:
+ conn.close()
+ self.run_task(AddConnection(self))
+
def _putconn(self, conn: CT, from_getconn: bool) -> None:
# Use a worker to perform eventual maintenance work in a separate task
if self._reset:
break
else:
# No client waiting for a connection: put it back into the pool
+ self._given.pop(id(conn), None)
self._pool.append(conn)
# If we have been asked to wait for pool init, notify the
# waiter if the pool is full.
class AsyncConnectionPool(Generic[ACT], BasePool):
_pool: deque[ACT]
+ _given: dict[int, ACT]
def __init__(
self,
if self._pool:
# Take a connection ready out of the pool
conn = self._pool.popleft()
+ self._given[id(conn)] = conn
if len(self._pool) < self._nconns_min:
self._nconns_min = len(self._pool)
elif self.max_waiting and len(self._waiting) >= self.max_waiting:
await self._putconn(conn, from_getconn=False)
+ async def drain(self) -> None:
+ """
+ Remove all the connections from the pool and create new ones.
+
+ If a connection is currently out of the pool it will be closed when
+ returned to the pool and replaced with a new one.
+
+ This method is useful to force a connection re-configuration, for
+ example when the adapters map changes after the pool was created.
+ """
+ async with self._lock:
+ conns = list(self._pool)
+ self._pool.clear()
+
+ # Mark the currently given connections as already expired,
+ # so they will be closed as soon as returned.
+ earlier = monotonic() - 1.0
+ for conn in self._given.values():
+ conn._expire_at = earlier
+
+ # Close the connection already in the pool, open new ones.
+ for conn in conns:
+ await conn.close()
+ self.run_task(AddConnection(self))
+
async def _putconn(self, conn: ACT, from_getconn: bool) -> None:
# Use a worker to perform eventual maintenance work in a separate task
if self._reset:
break
else:
# No client waiting for a connection: put it back into the pool
+ self._given.pop(id(conn), None)
self._pool.append(conn)
# If we have been asked to wait for pool init, notify the
# waiter if the pool is full.
assert time() - t0 <= 1.5
+@pytest.mark.crdb_skip("backend pid")
+def test_drain(pool_cls, dsn):
+ pids1 = set()
+ pids2 = set()
+ with pool_cls(dsn, min_size=min_size(pool_cls, 2)) as p:
+ p.wait()
+
+ with p.connection() as conn:
+ pids1.add(conn.info.backend_pid)
+ with p.connection() as conn2:
+ pids1.add(conn2.info.backend_pid)
+ p.drain()
+ assert len(pids1) == 2
+
+ with p.connection() as conn:
+ pids2.add(conn.info.backend_pid)
+ with p.connection() as conn2:
+ pids2.add(conn2.info.backend_pid)
+
+ assert len(pids2) == 2
+
+ assert not pids1 & pids2
+
+
@skip_sync
def test_cancellation_in_queue(pool_cls, dsn):
# https://github.com/psycopg/psycopg/issues/509
assert time() - t0 <= 1.5
+@pytest.mark.crdb_skip("backend pid")
+async def test_drain(pool_cls, dsn):
+ pids1 = set()
+ pids2 = set()
+ async with pool_cls(dsn, min_size=min_size(pool_cls, 2)) as p:
+ await p.wait()
+
+ async with p.connection() as conn:
+ pids1.add(conn.info.backend_pid)
+ async with p.connection() as conn2:
+ pids1.add(conn2.info.backend_pid)
+ await p.drain()
+ assert len(pids1) == 2
+
+ async with p.connection() as conn:
+ pids2.add(conn.info.backend_pid)
+ async with p.connection() as conn2:
+ pids2.add(conn2.info.backend_pid)
+
+ assert len(pids2) == 2
+
+ assert not pids1 & pids2
+
+
@skip_sync
async def test_cancellation_in_queue(pool_cls, dsn):
# https://github.com/psycopg/psycopg/issues/509