From: Daniele Varrazzo Date: Thu, 5 Oct 2023 00:43:50 +0000 (+0200) Subject: refactor(pool): give sync attribute the same life cycle of the async one X-Git-Tag: pool-3.2.0~12^2~21 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=20e2d02a17d95ebe4f0d26d842c2e6ee288c6c7e;p=thirdparty%2Fpsycopg.git refactor(pool): give sync attribute the same life cycle of the async one Create possibly async objects only after we are sure that a loop is running. --- diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py index b50e9b44e..9159b85ca 100644 --- a/psycopg_pool/psycopg_pool/pool.py +++ b/psycopg_pool/psycopg_pool/pool.py @@ -107,15 +107,16 @@ class ConnectionPool(Generic[CT], BasePool): self._reconnect_failed = reconnect_failed - self._lock = Lock() + self._lock: Lock + self._sched: Scheduler + self._tasks: Queue[MaintenanceTask] + self._waiting = Deque[WaitingClient[CT]]() # to notify that the pool is full self._pool_full_event: Optional[Event] = None - self._sched = Scheduler() self._sched_runner: Optional[threading.Thread] = None - self._tasks: Queue[MaintenanceTask] = Queue() self._workers: List[threading.Thread] = [] super().__init__( @@ -323,6 +324,8 @@ class ConnectionPool(Generic[CT], BasePool): because the pool was initialized with *open* = `!True`) but you cannot currently re-open a closed pool. """ + self._ensure_lock() + with self._lock: self._open() @@ -335,12 +338,24 @@ class ConnectionPool(Generic[CT], BasePool): self._check_open() + # A lock has been most likely, but not necessarily, created in `open()`. + self._ensure_lock() + + self._tasks = Queue() + self._sched = Scheduler() + self._closed = False self._opened = True self._start_workers() self._start_initial_tasks() + def _ensure_lock(self) -> None: + try: + self._lock + except AttributeError: + self._lock = Lock() + def _start_workers(self) -> None: self._sched_runner = spawn(self._sched.run, name=f"{self.name}-scheduler") assert not self._workers diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index 1f8dc9bcc..87b0a3d50 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -317,10 +317,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): currently re-open a closed pool. """ # Make sure the lock is created after there is an event loop - try: - self._lock - except AttributeError: - self._lock = ALock() + self._ensure_lock() async with self._lock: self._open() @@ -332,20 +329,15 @@ class AsyncConnectionPool(Generic[ACT], BasePool): if not self._closed: return - # Throw a RuntimeError if the pool is open outside a running loop. - asyncio.get_running_loop() - self._check_open() + # A lock has been most likely, but not necessarily, created in `open()`. + self._ensure_lock() + # Create these objects now to attach them to the right loop. # See #219 self._tasks = AQueue() self._sched = AsyncScheduler() - # This has been most likely, but not necessarily, created in `open()`. - try: - self._lock - except AttributeError: - self._lock = ALock() self._closed = False self._opened = True @@ -353,6 +345,20 @@ class AsyncConnectionPool(Generic[ACT], BasePool): self._start_workers() self._start_initial_tasks() + def _ensure_lock(self) -> None: + """Make sure the pool lock is created. + + In async code, also make sure that the loop is running. + """ + + # Throw a RuntimeError if the pool is open outside a running loop. + asyncio.get_running_loop() + + try: + self._lock + except AttributeError: + self._lock = ALock() + def _start_workers(self) -> None: self._sched_runner = aspawn(self._sched.run, name=f"{self.name}-scheduler") for i in range(self.num_workers):