From c5503975baf880fb81488c436c628e3cd3d95628 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sun, 22 Dec 2024 20:38:58 +0100 Subject: [PATCH] fix: always gather the notifications received Starting to register them after the first call to notifies() is somewhat weird. We also risk to lose notifications in a case such as: conn.execute("listen foo") conn.execute("listen bar") for n in conn.notifies(): ... --- docs/news.rst | 4 ++-- psycopg/psycopg/_connection_base.py | 5 ++--- psycopg/psycopg/connection.py | 10 ++-------- psycopg/psycopg/connection_async.py | 11 ++--------- tests/test_notify.py | 17 +++++++++++++++++ tests/test_notify_async.py | 17 +++++++++++++++++ 6 files changed, 42 insertions(+), 22 deletions(-) diff --git a/docs/news.rst b/docs/news.rst index e284ddedd..d222448c5 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -19,8 +19,8 @@ Python 3.3.0 (unreleased) Psycopg 3.2.4 (unreleased) ^^^^^^^^^^^^^^^^^^^^^^^^^^ -- Don't lose notifies received between two `~Connection.notifies()` calls - (:ticket:`#962`). +- Don't lose notifies received whilst the `~Connection.notifies()` iterator + is not running (:ticket:`#962`). - Make sure that the notifies callback is called during the use of the `~Connection.notifies()` generator (:ticket:`#972`). diff --git a/psycopg/psycopg/_connection_base.py b/psycopg/psycopg/_connection_base.py index a2b7ac44e..ab34a7cc0 100644 --- a/psycopg/psycopg/_connection_base.py +++ b/psycopg/psycopg/_connection_base.py @@ -117,12 +117,11 @@ class BaseConnection(Generic[Row]): pgconn.notify_handler = partial(BaseConnection._notify_handler, wself) # Gather notifies when the notifies() generator is not running. - # This handler is registered after notifies() is used te first time. - # backlog = None means that the handler hasn't been registered. - self._notifies_backlog: Deque[Notify] | None = None + self._notifies_backlog = Deque[Notify]() self._notifies_backlog_handler = partial( BaseConnection._add_notify_to_backlog, wself ) + self.add_notify_handler(self._notifies_backlog_handler) # Attribute is only set if the connection is from a pool so we can tell # apart a connection in the pool too (when _pool = None) diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index 598dac7c8..4a590805f 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -23,7 +23,7 @@ from ._tpc import Xid from .rows import Row, RowFactory, tuple_row, args_row from .adapt import AdaptersMap from ._enums import IsolationLevel -from ._compat import Deque, Self +from ._compat import Self from .conninfo import make_conninfo, conninfo_to_dict from .conninfo import conninfo_attempts, timeout_from_conninfo from ._pipeline import Pipeline @@ -339,11 +339,9 @@ class Connection(BaseConnection[Row]): with self.lock: enc = self.pgconn._encoding - # If the backlog is set to not-None, then the handler is also set. # Remove the handler for the duration of this critical section to # avoid reporting notifies twice. - if self._notifies_backlog is not None: - self.remove_notify_handler(self._notifies_backlog_handler) + self.remove_notify_handler(self._notifies_backlog_handler) try: while True: @@ -378,10 +376,6 @@ class Connection(BaseConnection[Row]): if interval < 0.0: break finally: - # Install, or re-install, the backlog notify handler - # to catch notifications received while the generator was off. - if self._notifies_backlog is None: - self._notifies_backlog = Deque() self.add_notify_handler(self._notifies_backlog_handler) @contextmanager diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index 577199dd0..00700a8e1 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -20,7 +20,7 @@ from ._tpc import Xid from .rows import Row, AsyncRowFactory, tuple_row, args_row from .adapt import AdaptersMap from ._enums import IsolationLevel -from ._compat import Deque, Self +from ._compat import Self from .conninfo import make_conninfo, conninfo_to_dict from .conninfo import conninfo_attempts_async, timeout_from_conninfo from ._pipeline import AsyncPipeline @@ -359,11 +359,9 @@ class AsyncConnection(BaseConnection[Row]): async with self.lock: enc = self.pgconn._encoding - # If the backlog is set to not-None, then the handler is also set. # Remove the handler for the duration of this critical section to # avoid reporting notifies twice. - if self._notifies_backlog is not None: - self.remove_notify_handler(self._notifies_backlog_handler) + self.remove_notify_handler(self._notifies_backlog_handler) try: while True: @@ -401,11 +399,6 @@ class AsyncConnection(BaseConnection[Row]): if interval < 0.0: break finally: - # Install, or re-install, the backlog notify handler - # to catch notifications received while the generator was off. - if self._notifies_backlog is None: - self._notifies_backlog = Deque() - self.add_notify_handler(self._notifies_backlog_handler) @asynccontextmanager diff --git a/tests/test_notify.py b/tests/test_notify.py index 3871722dd..8f741619d 100644 --- a/tests/test_notify.py +++ b/tests/test_notify.py @@ -255,6 +255,23 @@ def test_generator_and_handler(conn, conn_cls, dsn): assert n2 +@pytest.mark.parametrize("query_between", [True, False]) +def test_first_notify_not_lost(conn, conn_cls, dsn, query_between): + conn.set_autocommit(True) + conn.execute("listen foo") + + with conn_cls.connect(dsn, autocommit=True) as conn2: + conn2.execute("notify foo, 'hi'") + + if query_between: + conn.execute("select 1") + + n = None + for n in conn.notifies(timeout=1, stop_after=1): + pass + assert n + + @pytest.mark.slow @pytest.mark.timing @pytest.mark.parametrize("sleep_on", ["server", "client"]) diff --git a/tests/test_notify_async.py b/tests/test_notify_async.py index aebc333b0..8c87f3349 100644 --- a/tests/test_notify_async.py +++ b/tests/test_notify_async.py @@ -252,6 +252,23 @@ async def test_generator_and_handler(aconn, aconn_cls, dsn): assert n2 +@pytest.mark.parametrize("query_between", [True, False]) +async def test_first_notify_not_lost(aconn, aconn_cls, dsn, query_between): + await aconn.set_autocommit(True) + await aconn.execute("listen foo") + + async with await aconn_cls.connect(dsn, autocommit=True) as conn2: + await conn2.execute("notify foo, 'hi'") + + if query_between: + await aconn.execute("select 1") + + n = None + async for n in aconn.notifies(timeout=1, stop_after=1): + pass + assert n + + @pytest.mark.slow @pytest.mark.timing @pytest.mark.parametrize("sleep_on", ["server", "client"]) -- 2.39.5