# the connection is now back in the pool
+ .. automethod:: open
+
.. automethod:: close
.. note::
# the connection is now back in the pool
+ .. automethod:: open
+
.. automethod:: close
.. note::
Current release
---------------
+psycopg_pool 3.1.0
+^^^^^^^^^^^^^^^^^^
+
+- Add `ConnectionPool.open()` and `AsyncConnectionPool.open()`
+ (:ticket:`#155`).
+
psycopg_pool 3.0.2
^^^^^^^^^^^^^^^^^^
# connections to the pool.
self._growing = False
- # _close should be the last property to be set in the state
- # to avoid warning on __del__ in case __init__ fails.
- self._closed = False
+ self._closed = True
def __repr__(self) -> str:
return (
self._pool_full_event: Optional[threading.Event] = None
self._sched = Scheduler()
+ self._sched_runner: Optional[threading.Thread] = None
self._tasks: "Queue[MaintenanceTask]" = Queue()
self._workers: List[threading.Thread] = []
super().__init__(conninfo, **kwargs)
- self._sched_runner = threading.Thread(
- target=self._sched.run, name=f"{self.name}-scheduler", daemon=True
- )
- for i in range(self.num_workers):
- t = threading.Thread(
- target=self.worker,
- args=(self._tasks,),
- name=f"{self.name}-worker-{i}",
- daemon=True,
- )
- self._workers.append(t)
-
- # The object state is complete. Start the worker threads
- self._sched_runner.start()
- for t in self._workers:
- t.start()
-
- # populate the pool with initial min_size connections in background
- for i in range(self._nconns):
- self.run_task(AddConnection(self))
-
- # Schedule a task to shrink the pool if connections over min_size have
- # remained unused.
- self.schedule_task(ShrinkPool(self), self.max_idle)
+ self.open()
def __del__(self) -> None:
# If the '_closed' property is not set we probably failed in __init__.
else:
self._return_connection(conn)
+ def open(self) -> None:
+ """Open the pool by starting worker threads.
+
+ No-op if the pool is already opened.
+ """
+ if not self._closed:
+ return
+
+ self._sched_runner = threading.Thread(
+ target=self._sched.run, name=f"{self.name}-scheduler", daemon=True
+ )
+ assert not self._workers
+ for i in range(self.num_workers):
+ t = threading.Thread(
+ target=self.worker,
+ args=(self._tasks,),
+ name=f"{self.name}-worker-{i}",
+ daemon=True,
+ )
+ self._workers.append(t)
+
+ # The object state is complete. Start the worker threads
+ self._sched_runner.start()
+ for t in self._workers:
+ t.start()
+
+ # populate the pool with initial min_size connections in background
+ for i in range(self._nconns):
+ self.run_task(AddConnection(self))
+
+ # Schedule a task to shrink the pool if connections over min_size have
+ # remained unused.
+ self.schedule_task(ShrinkPool(self), self.max_idle)
+
+ self._closed = False
+
def close(self, timeout: float = 5.0) -> None:
"""Close the pool and make it unavailable to new clients.
conn.close()
# Wait for the worker threads to terminate
+ assert self._sched_runner is not None
if timeout > 0:
for t in [self._sched_runner] + self._workers:
if not t.is_alive():
self.name,
timeout,
)
+ self._sched_runner = None
def __enter__(self) -> "ConnectionPool":
return self
self._pool_full_event: Optional[asyncio.Event] = None
self._sched = AsyncScheduler()
+ self._sched_runner: Optional[Task[None]] = None
self._tasks: "asyncio.Queue[MaintenanceTask]" = asyncio.Queue()
self._workers: List[Task[None]] = []
super().__init__(conninfo, **kwargs)
- self._sched_runner = create_task(
- self._sched.run(), name=f"{self.name}-scheduler"
- )
- for i in range(self.num_workers):
- t = create_task(
- self.worker(self._tasks),
- name=f"{self.name}-worker-{i}",
- )
- self._workers.append(t)
-
- # populate the pool with initial min_size connections in background
- for i in range(self._nconns):
- self.run_task(AddConnection(self))
-
- # Schedule a task to shrink the pool if connections over min_size have
- # remained unused.
- self.run_task(Schedule(self, ShrinkPool(self), self.max_idle))
+ self.open()
async def wait(self, timeout: float = 30.0) -> None:
async with self._lock:
else:
await self._return_connection(conn)
+ def open(self) -> None:
+ """Open the pool by starting worker tasks.
+
+ No-op if the pool is already opened.
+ """
+ if not self._closed:
+ return
+
+ self._sched_runner = create_task(
+ self._sched.run(), name=f"{self.name}-scheduler"
+ )
+ for i in range(self.num_workers):
+ t = create_task(
+ self.worker(self._tasks),
+ name=f"{self.name}-worker-{i}",
+ )
+ self._workers.append(t)
+
+ # populate the pool with initial min_size connections in background
+ for i in range(self._nconns):
+ self.run_task(AddConnection(self))
+
+ # Schedule a task to shrink the pool if connections over min_size have
+ # remained unused.
+ self.run_task(Schedule(self, ShrinkPool(self), self.max_idle))
+
+ self._closed = False
+
async def close(self, timeout: float = 5.0) -> None:
if self._closed:
return
await conn.close()
# Wait for the worker tasks to terminate
+ assert self._sched_runner is not None
wait = asyncio.gather(self._sched_runner, *self._workers)
try:
if timeout > 0:
self.name,
timeout,
)
+ self._sched_runner = None
async def __aenter__(self) -> "AsyncConnectionPool":
return self
def test_close_no_threads(dsn):
p = pool.ConnectionPool(dsn)
- assert p._sched_runner.is_alive()
+ assert p._sched_runner and p._sched_runner.is_alive()
for t in p._workers:
assert t.is_alive()
p.close()
- assert not p._sched_runner.is_alive()
+ assert p._sched_runner is None
for t in p._workers:
assert not t.is_alive()
@pytest.mark.slow
def test_del_stop_threads(dsn):
p = pool.ConnectionPool(dsn)
+ assert p._sched_runner is not None
ts = [p._sched_runner] + p._workers
del p
sleep(0.1)
async def test_close_no_tasks(dsn):
p = pool.AsyncConnectionPool(dsn)
- assert not p._sched_runner.done()
+ assert p._sched_runner and not p._sched_runner.done()
for t in p._workers:
assert not t.done()
await p.close()
- assert p._sched_runner.done()
+ assert p._sched_runner is None
for t in p._workers:
assert t.done()