you don't use the pool as a context manager) you might want to specify
this parameter explicitly.
- .. automethod:: wait
.. automethod:: connection
.. code:: python
with ConnectionPool(...) as pool:
# code using the pool
+ .. automethod:: wait
+
.. attribute:: name
:type: str
the pool.
:type reset: `async Callable[[AsyncConnection], None]`
- .. automethod:: wait
.. automethod:: connection
.. code:: python
# the connection is now back in the pool
.. automethod:: open
-
.. automethod:: close
.. note::
async with AsyncConnectionPool(...) as pool:
# code using the pool
+ .. automethod:: wait
.. automethod:: resize
.. automethod:: check
.. automethod:: getconn
else:
self._return_connection(conn)
- def open(self) -> None:
+ def open(self, wait: bool = False, timeout: float = 30.0) -> None:
"""Open the pool by starting connecting and and accepting clients.
- The method is no-op if the pool is already opened (because the method
- was already called, or because the pool context was entered, or because
- the pool was initialized with ``open=true``.
+ If *wait* is `!False`, return immediately and let the background worker
+ fill the pool if `min_size` > 0. Otherwise wait up to *timeout* seconds
+ for the requested number of connections to be ready (see `wait()` for
+ details).
+
+ It is safe to call `!open()` again on a pool already open (because the
+ method was already called, or because the pool context was entered, or
+ because the pool was initialized with *open* = `!True`) but you cannot
+ currently re-open a closed pool.
"""
with self._lock:
- if not self._closed:
- return
+ self._open()
+
+ if wait:
+ self.wait(timeout=timeout)
+
+ def _open(self) -> None:
+ if not self._closed:
+ return
- self._check_open()
+ self._check_open()
- self._start_workers()
- self._start_initial_tasks()
+ self._start_workers()
+ self._start_initial_tasks()
- self._closed = False
- self._opened = True
+ self._closed = False
+ self._opened = True
def _start_workers(self) -> None:
self._sched_runner = threading.Thread(
else:
await self._return_connection(conn)
- async def open(self) -> None:
+ async def open(self, wait: bool = False, timeout: float = 30.0) -> None:
async with self._lock:
self._open()
+ if wait:
+ await self.wait(timeout=timeout)
+
def _open(self) -> None:
if not self._closed:
return
p.close()
+@pytest.mark.slow
+@pytest.mark.timing
+def test_open_wait(dsn, monkeypatch):
+ delay_connection(monkeypatch, 0.1)
+ with pytest.raises(pool.PoolTimeout):
+ p = pool.ConnectionPool(dsn, min_size=4, num_workers=1, open=False)
+ try:
+ p.open(wait=True, timeout=0.3)
+ finally:
+ p.close()
+
+ p = pool.ConnectionPool(dsn, min_size=4, num_workers=1, open=False)
+ try:
+ p.open(wait=True, timeout=0.5)
+ finally:
+ p.close()
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+def test_open_as_wait(dsn, monkeypatch):
+ delay_connection(monkeypatch, 0.1)
+ with pytest.raises(pool.PoolTimeout):
+ with pool.ConnectionPool(dsn, min_size=4, num_workers=1) as p:
+ p.open(wait=True, timeout=0.3)
+
+ with pool.ConnectionPool(dsn, min_size=4, num_workers=1) as p:
+ p.open(wait=True, timeout=0.5)
+
+
def test_reopen(dsn):
p = pool.ConnectionPool(dsn)
with p.connection() as conn:
await p.close()
+@pytest.mark.slow
+@pytest.mark.timing
+async def test_open_wait(dsn, monkeypatch):
+ delay_connection(monkeypatch, 0.1)
+ with pytest.raises(pool.PoolTimeout):
+ p = pool.AsyncConnectionPool(
+ dsn, min_size=4, num_workers=1, open=False
+ )
+ try:
+ await p.open(wait=True, timeout=0.3)
+ finally:
+ await p.close()
+
+ p = pool.AsyncConnectionPool(dsn, min_size=4, num_workers=1, open=False)
+ try:
+ await p.open(wait=True, timeout=0.5)
+ finally:
+ await p.close()
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+async def test_open_as_wait(dsn, monkeypatch):
+ delay_connection(monkeypatch, 0.1)
+ with pytest.raises(pool.PoolTimeout):
+ async with pool.AsyncConnectionPool(
+ dsn, min_size=4, num_workers=1
+ ) as p:
+ await p.open(wait=True, timeout=0.3)
+
+ async with pool.AsyncConnectionPool(dsn, min_size=4, num_workers=1) as p:
+ await p.open(wait=True, timeout=0.5)
+
+
async def test_reopen(dsn):
p = pool.AsyncConnectionPool(dsn)
async with p.connection() as conn: