kwargs: Optional[Dict[str, Any]] = None,
min_size: int = 4,
max_size: Optional[int] = None,
+ open: bool = True,
name: Optional[str] = None,
timeout: float = 30.0,
max_waiting: int = 0,
self,
conninfo: str = "",
*,
+ open: bool = True,
connection_class: Type[Connection[Any]] = Connection,
configure: Optional[Callable[[Connection[Any]], None]] = None,
reset: Optional[Callable[[Connection[Any]], None]] = None,
super().__init__(conninfo, **kwargs)
- self.open()
+ if open:
+ self.open()
def __del__(self) -> None:
# If the '_closed' property is not set we probably failed in __init__.
No-op if the pool is already opened.
"""
- if not self._closed:
- return
+ with self._lock:
+ if not self._closed:
+ return
- self._check_open()
+ self._check_open()
+ self._start_workers()
+
+ self._closed = False
+ self._opened = True
+
+ def _start_workers(self) -> None:
self._sched_runner = threading.Thread(
- target=self._sched.run, name=f"{self.name}-scheduler", daemon=True
+ target=self._sched.run,
+ name=f"{self.name}-scheduler",
+ daemon=True,
)
assert not self._workers
for i in range(self.num_workers):
# remained unused.
self.schedule_task(ShrinkPool(self), self.max_idle)
- self._closed = False
- self._opened = True
-
def close(self, timeout: float = 5.0) -> None:
"""Close the pool and make it unavailable to new clients.
)
def __enter__(self) -> "ConnectionPool":
+ self.open()
return self
def __exit__(
self,
conninfo: str = "",
*,
+ open: bool = True,
connection_class: Type[AsyncConnection[Any]] = AsyncConnection,
configure: Optional[
Callable[[AsyncConnection[Any]], Awaitable[None]]
super().__init__(conninfo, **kwargs)
- self.open()
+ if open:
+ self.open()
async def wait(self, timeout: float = 30.0) -> None:
async with self._lock:
await self._return_connection(conn)
def open(self) -> None:
- """Open the pool by starting worker tasks.
-
- No-op if the pool is already opened.
- """
if not self._closed:
return
self._check_open()
+ self._start_workers()
+
+ self._closed = False
+ self._opened = True
+
+ def _start_workers(self) -> None:
self._sched_runner = create_task(
self._sched.run(), name=f"{self.name}-scheduler"
)
# remained unused.
self.run_task(Schedule(self, ShrinkPool(self), self.max_idle))
- self._closed = False
- self._opened = True
-
async def close(self, timeout: float = 5.0) -> None:
if self._closed:
return
)
async def __aenter__(self) -> "AsyncConnectionPool":
+ self.open()
return self
async def __aexit__(
assert len(success) == 2
+def test_open_explicit(dsn):
+ p = pool.ConnectionPool(dsn, open=False)
+ assert p.closed
+ with pytest.raises(pool.PoolClosed):
+ p.getconn()
+
+ with pytest.raises(pool.PoolClosed):
+ with p.connection():
+ pass
+
+ p.open()
+ try:
+ assert not p.closed
+
+ with p.connection() as conn:
+ cur = conn.execute("select 1")
+ assert cur.fetchone() == (1,)
+
+ finally:
+ p.close()
+
+
+def test_open_context(dsn):
+ p = pool.ConnectionPool(dsn, open=False)
+ assert p.closed
+
+ with p:
+ assert not p.closed
+
+ with p.connection() as conn:
+ cur = conn.execute("select 1")
+ assert cur.fetchone() == (1,)
+
+ assert p.closed
+
+
+def test_open_no_op(dsn):
+ p = pool.ConnectionPool(dsn)
+ try:
+ assert not p.closed
+ p.open()
+ assert not p.closed
+
+ with p.connection() as conn:
+ cur = conn.execute("select 1")
+ assert cur.fetchone() == (1,)
+
+ finally:
+ p.close()
+
+
def test_reopen(dsn):
p = pool.ConnectionPool(dsn)
with p.connection() as conn:
assert len(success) == 2
+async def test_open_explicit(dsn):
+ p = pool.AsyncConnectionPool(dsn, open=False)
+ assert p.closed
+ with pytest.raises(pool.PoolClosed):
+ await p.getconn()
+
+ with pytest.raises(pool.PoolClosed):
+ async with p.connection():
+ pass
+
+ p.open()
+ try:
+ assert not p.closed
+
+ async with p.connection() as conn:
+ cur = await conn.execute("select 1")
+ assert await cur.fetchone() == (1,)
+
+ finally:
+ await p.close()
+
+
+async def test_open_context(dsn):
+ p = pool.AsyncConnectionPool(dsn, open=False)
+ assert p.closed
+
+ async with p:
+ assert not p.closed
+
+ async with p.connection() as conn:
+ cur = await conn.execute("select 1")
+ assert await cur.fetchone() == (1,)
+
+ assert p.closed
+
+
+async def test_open_no_op(dsn):
+ p = pool.AsyncConnectionPool(dsn)
+ try:
+ assert not p.closed
+ p.open()
+ assert not p.closed
+
+ async with p.connection() as conn:
+ cur = await conn.execute("select 1")
+ assert await cur.fetchone() == (1,)
+
+ finally:
+ await p.close()
+
+
async def test_reopen(dsn):
p = pool.AsyncConnectionPool(dsn)
async with p.connection() as conn: