From: Daniele Varrazzo Date: Fri, 10 Jan 2025 14:17:01 +0000 (+0100) Subject: refactor: don't keep the notifiers backlog handler in the connection state X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=refs%2Fpull%2F992%2Fhead;p=thirdparty%2Fpsycopg.git refactor: don't keep the notifiers backlog handler in the connection state Just keep the queue in the state and special-case its handling in the `_notify_handler` connection method instead of registering a standard handler. Set the queue to None to signify that we are in the `notifies()` generator. This way we don't need the awkward weak-self + class method to avoid a reference loop and to dereference the connection weak reference another time, as we just did in `_notify_handler()`. Setting the queue to None also feels cleaner than adding/removing the handler. Relates to #975. --- diff --git a/psycopg/psycopg/_connection_base.py b/psycopg/psycopg/_connection_base.py index a260fe177..a5222f631 100644 --- a/psycopg/psycopg/_connection_base.py +++ b/psycopg/psycopg/_connection_base.py @@ -113,17 +113,14 @@ class BaseConnection(Generic[Row]): self._prepared: PrepareManager = PrepareManager() self._tpc: tuple[Xid, bool] | None = None # xid, prepared + # Gather notifies when the notifies() generator is not running. + # It will be set to None during `notifies()` generator run. + self._notifies_backlog: deque[Notify] | None = deque() + wself = ref(self) pgconn.notice_handler = partial(BaseConnection._notice_handler, wself) pgconn.notify_handler = partial(BaseConnection._notify_handler, wself) - # Gather notifies when the notifies() generator is not running. - self._notifies_backlog = deque[Notify]() - self._notifies_backlog_handler = partial( - BaseConnection._add_notify_to_backlog, wself - ) - self.add_notify_handler(self._notifies_backlog_handler) - # Attribute is only set if the connection is from a pool so we can tell # apart a connection in the pool too (when _pool = None) self._pool: BasePool | None @@ -376,24 +373,19 @@ class BaseConnection(Generic[Row]): def _notify_handler( wself: ReferenceType[BaseConnection[Row]], pgn: pq.PGnotify ) -> None: - self = wself() - if not (self and self._notify_handlers): + if not (self := wself()): return enc = self.pgconn._encoding n = Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid) + + # `_notifies_backlog` is None if the `notifies()` generator is running + if (d := self._notifies_backlog) is not None: + d.append(n) + for cb in self._notify_handlers: cb(n) - @staticmethod - def _add_notify_to_backlog( - wself: ReferenceType[BaseConnection[Row]], notify: Notify - ) -> None: - self = wself() - if not self or self._notifies_backlog is None: - return - self._notifies_backlog.append(notify) - @property def prepare_threshold(self) -> int | None: """ diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index 2f0c03e5a..a0fd1cdee 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -340,17 +340,17 @@ class Connection(BaseConnection[Row]): with self.lock: enc = self.pgconn._encoding - # Remove the handler for the duration of this critical section to - # avoid reporting notifies twice. - self.remove_notify_handler(self._notifies_backlog_handler) + # Remove the backlog deque for the duration of this critical + # section to avoid reporting notifies twice. + self._notifies_backlog, d = (None, self._notifies_backlog) try: while True: # if notifies were received when the generator was off, # return them in a first batch. - if self._notifies_backlog: - while self._notifies_backlog: - yield self._notifies_backlog.popleft() + if d: + while d: + yield d.popleft() nreceived += 1 else: try: @@ -377,7 +377,7 @@ class Connection(BaseConnection[Row]): if interval < 0.0: break finally: - self.add_notify_handler(self._notifies_backlog_handler) + self._notifies_backlog = d @contextmanager def pipeline(self) -> Iterator[Pipeline]: diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index 203976078..840839508 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -359,17 +359,17 @@ class AsyncConnection(BaseConnection[Row]): async with self.lock: enc = self.pgconn._encoding - # Remove the handler for the duration of this critical section to - # avoid reporting notifies twice. - self.remove_notify_handler(self._notifies_backlog_handler) + # Remove the backlog deque for the duration of this critical + # section to avoid reporting notifies twice. + self._notifies_backlog, d = None, self._notifies_backlog try: while True: # if notifies were received when the generator was off, # return them in a first batch. - if self._notifies_backlog: - while self._notifies_backlog: - yield self._notifies_backlog.popleft() + if d: + while d: + yield d.popleft() nreceived += 1 else: try: @@ -399,7 +399,7 @@ class AsyncConnection(BaseConnection[Row]): if interval < 0.0: break finally: - self.add_notify_handler(self._notifies_backlog_handler) + self._notifies_backlog = d @asynccontextmanager async def pipeline(self) -> AsyncIterator[AsyncPipeline]: