From: Denis Laxalde Date: Mon, 15 Nov 2021 08:13:04 +0000 (+0100) Subject: Add an open() method to connection pool classes X-Git-Tag: pool-3.1~45^2~8 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=fa541aa7a013372e2395378d7975be96b3c78454;p=thirdparty%2Fpsycopg.git Add an open() method to connection pool classes This method is responsible for setting the '_closed' attribute, which hence now defaults to True in the base class, along with the _sched_runner attribute, which is reset to None in close(). --- diff --git a/docs/api/pool.rst b/docs/api/pool.rst index 3604c614e..66b6de2ab 100644 --- a/docs/api/pool.rst +++ b/docs/api/pool.rst @@ -145,6 +145,8 @@ The `!ConnectionPool` class # the connection is now back in the pool + .. automethod:: open + .. automethod:: close .. note:: @@ -233,6 +235,8 @@ listed here. # the connection is now back in the pool + .. automethod:: open + .. automethod:: close .. note:: diff --git a/docs/news_pool.rst b/docs/news_pool.rst index 3dfca7fc3..62c6a3355 100644 --- a/docs/news_pool.rst +++ b/docs/news_pool.rst @@ -10,6 +10,12 @@ Current release --------------- +psycopg_pool 3.1.0 +^^^^^^^^^^^^^^^^^^ + +- Add `ConnectionPool.open()` and `AsyncConnectionPool.open()` + (:ticket:`#155`). + psycopg_pool 3.0.2 ^^^^^^^^^^^^^^^^^^ diff --git a/psycopg_pool/psycopg_pool/base.py b/psycopg_pool/psycopg_pool/base.py index 160221507..129ffc623 100644 --- a/psycopg_pool/psycopg_pool/base.py +++ b/psycopg_pool/psycopg_pool/base.py @@ -94,9 +94,7 @@ class BasePool(Generic[ConnectionType]): # connections to the pool. self._growing = False - # _close should be the last property to be set in the state - # to avoid warning on __del__ in case __init__ fails. - self._closed = False + self._closed = True def __repr__(self) -> str: return ( diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py index dcbcde02a..a6ec95601 100644 --- a/psycopg_pool/psycopg_pool/pool.py +++ b/psycopg_pool/psycopg_pool/pool.py @@ -48,35 +48,13 @@ class ConnectionPool(BasePool[Connection[Any]]): self._pool_full_event: Optional[threading.Event] = None self._sched = Scheduler() + self._sched_runner: Optional[threading.Thread] = None self._tasks: "Queue[MaintenanceTask]" = Queue() self._workers: List[threading.Thread] = [] super().__init__(conninfo, **kwargs) - self._sched_runner = threading.Thread( - target=self._sched.run, name=f"{self.name}-scheduler", daemon=True - ) - for i in range(self.num_workers): - t = threading.Thread( - target=self.worker, - args=(self._tasks,), - name=f"{self.name}-worker-{i}", - daemon=True, - ) - self._workers.append(t) - - # The object state is complete. Start the worker threads - self._sched_runner.start() - for t in self._workers: - t.start() - - # populate the pool with initial min_size connections in background - for i in range(self._nconns): - self.run_task(AddConnection(self)) - - # Schedule a task to shrink the pool if connections over min_size have - # remained unused. - self.schedule_task(ShrinkPool(self), self.max_idle) + self.open() def __del__(self) -> None: # If the '_closed' property is not set we probably failed in __init__. @@ -254,6 +232,42 @@ class ConnectionPool(BasePool[Connection[Any]]): else: self._return_connection(conn) + def open(self) -> None: + """Open the pool by starting worker threads. + + No-op if the pool is already opened. + """ + if not self._closed: + return + + self._sched_runner = threading.Thread( + target=self._sched.run, name=f"{self.name}-scheduler", daemon=True + ) + assert not self._workers + for i in range(self.num_workers): + t = threading.Thread( + target=self.worker, + args=(self._tasks,), + name=f"{self.name}-worker-{i}", + daemon=True, + ) + self._workers.append(t) + + # The object state is complete. Start the worker threads + self._sched_runner.start() + for t in self._workers: + t.start() + + # populate the pool with initial min_size connections in background + for i in range(self._nconns): + self.run_task(AddConnection(self)) + + # Schedule a task to shrink the pool if connections over min_size have + # remained unused. + self.schedule_task(ShrinkPool(self), self.max_idle) + + self._closed = False + def close(self, timeout: float = 5.0) -> None: """Close the pool and make it unavailable to new clients. @@ -297,6 +311,7 @@ class ConnectionPool(BasePool[Connection[Any]]): conn.close() # Wait for the worker threads to terminate + assert self._sched_runner is not None if timeout > 0: for t in [self._sched_runner] + self._workers: if not t.is_alive(): @@ -309,6 +324,7 @@ class ConnectionPool(BasePool[Connection[Any]]): self.name, timeout, ) + self._sched_runner = None def __enter__(self) -> "ConnectionPool": return self diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index 7c4f4548a..cf6887ef4 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -57,28 +57,13 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]): self._pool_full_event: Optional[asyncio.Event] = None self._sched = AsyncScheduler() + self._sched_runner: Optional[Task[None]] = None self._tasks: "asyncio.Queue[MaintenanceTask]" = asyncio.Queue() self._workers: List[Task[None]] = [] super().__init__(conninfo, **kwargs) - self._sched_runner = create_task( - self._sched.run(), name=f"{self.name}-scheduler" - ) - for i in range(self.num_workers): - t = create_task( - self.worker(self._tasks), - name=f"{self.name}-worker-{i}", - ) - self._workers.append(t) - - # populate the pool with initial min_size connections in background - for i in range(self._nconns): - self.run_task(AddConnection(self)) - - # Schedule a task to shrink the pool if connections over min_size have - # remained unused. - self.run_task(Schedule(self, ShrinkPool(self), self.max_idle)) + self.open() async def wait(self, timeout: float = 30.0) -> None: async with self._lock: @@ -205,6 +190,34 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]): else: await self._return_connection(conn) + def open(self) -> None: + """Open the pool by starting worker tasks. + + No-op if the pool is already opened. + """ + if not self._closed: + return + + self._sched_runner = create_task( + self._sched.run(), name=f"{self.name}-scheduler" + ) + for i in range(self.num_workers): + t = create_task( + self.worker(self._tasks), + name=f"{self.name}-worker-{i}", + ) + self._workers.append(t) + + # populate the pool with initial min_size connections in background + for i in range(self._nconns): + self.run_task(AddConnection(self)) + + # Schedule a task to shrink the pool if connections over min_size have + # remained unused. + self.run_task(Schedule(self, ShrinkPool(self), self.max_idle)) + + self._closed = False + async def close(self, timeout: float = 5.0) -> None: if self._closed: return @@ -238,6 +251,7 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]): await conn.close() # Wait for the worker tasks to terminate + assert self._sched_runner is not None wait = asyncio.gather(self._sched_runner, *self._workers) try: if timeout > 0: @@ -250,6 +264,7 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]): self.name, timeout, ) + self._sched_runner = None async def __aenter__(self) -> "AsyncConnectionPool": return self diff --git a/tests/pool/test_pool.py b/tests/pool/test_pool.py index 748fd9369..e96e0518e 100644 --- a/tests/pool/test_pool.py +++ b/tests/pool/test_pool.py @@ -561,12 +561,12 @@ def test_fail_rollback_close(dsn, caplog, monkeypatch): def test_close_no_threads(dsn): p = pool.ConnectionPool(dsn) - assert p._sched_runner.is_alive() + assert p._sched_runner and p._sched_runner.is_alive() for t in p._workers: assert t.is_alive() p.close() - assert not p._sched_runner.is_alive() + assert p._sched_runner is None for t in p._workers: assert not t.is_alive() @@ -603,6 +603,7 @@ def test_del_no_warning(dsn, recwarn): @pytest.mark.slow def test_del_stop_threads(dsn): p = pool.ConnectionPool(dsn) + assert p._sched_runner is not None ts = [p._sched_runner] + p._workers del p sleep(0.1) diff --git a/tests/pool/test_pool_async.py b/tests/pool/test_pool_async.py index 9ec20f22a..2efab352f 100644 --- a/tests/pool/test_pool_async.py +++ b/tests/pool/test_pool_async.py @@ -576,12 +576,12 @@ async def test_fail_rollback_close(dsn, caplog, monkeypatch): async def test_close_no_tasks(dsn): p = pool.AsyncConnectionPool(dsn) - assert not p._sched_runner.done() + assert p._sched_runner and not p._sched_runner.done() for t in p._workers: assert not t.done() await p.close() - assert p._sched_runner.done() + assert p._sched_runner is None for t in p._workers: assert t.done()