From: Daniele Varrazzo Date: Wed, 14 May 2025 15:16:03 +0000 (+0200) Subject: fix: collect notifies only if no handler was registered X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=refs%2Fpull%2F1092%2Fhead;p=thirdparty%2Fpsycopg.git fix: collect notifies only if no handler was registered If someone is listening to notifications by using an handler, the notifies backlog would fill without ever being emptied. This change has the risk of breaking something if someone is relying on notifies being received both via callback and via generator, but I don't know how to satisfy it without creating a leak to users who don't use the generator. Will ask around... Fix #1091. --- diff --git a/psycopg/psycopg/_connection_base.py b/psycopg/psycopg/_connection_base.py index 7ac6af1a5..59dd17fd8 100644 --- a/psycopg/psycopg/_connection_base.py +++ b/psycopg/psycopg/_connection_base.py @@ -376,12 +376,13 @@ class BaseConnection(Generic[Row]): enc = self.pgconn._encoding n = Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid) - # `_notifies_backlog` is None if the `notifies()` generator is running - if (d := self._notifies_backlog) is not None: - d.append(n) - - for cb in self._notify_handlers: - cb(n) + if self._notify_handlers: + for cb in self._notify_handlers: + cb(n) + else: + # `_notifies_backlog` is None if the `notifies()` generator is running + if (d := self._notifies_backlog) is not None: + d.append(n) @property def prepare_threshold(self) -> int | None: diff --git a/tests/test_notify.py b/tests/test_notify.py index e811f0e71..99c99e698 100644 --- a/tests/test_notify.py +++ b/tests/test_notify.py @@ -276,10 +276,10 @@ def test_first_notify_not_lost(conn, conn_cls, dsn, query_between): @pytest.mark.slow @pytest.mark.timing @pytest.mark.parametrize("sleep_on", ["server", "client"]) -def test_notify_query_notify(conn_cls, dsn, sleep_on): +@pytest.mark.parametrize("listen_by", ["callback", "generator"]) +def test_notify_query_notify(conn_cls, dsn, sleep_on, listen_by): e = Event() - by_gen: list[int] = [] - by_cb: list[int] = [] + notifies: list[int] = [] workers = [] def notifier(): @@ -291,12 +291,14 @@ def test_notify_query_notify(conn_cls, dsn, sleep_on): def listener(): with conn_cls.connect(dsn, autocommit=True) as conn: - conn.add_notify_handler(lambda n: by_cb.append(int(n.payload))) + if listen_by == "callback": + conn.add_notify_handler(lambda n: notifies.append(int(n.payload))) conn.execute("listen counter") e.set() for n in conn.notifies(timeout=0.2): - by_gen.append(int(n.payload)) + if listen_by == "generator": + notifies.append(int(n.payload)) if sleep_on == "server": conn.execute("select pg_sleep(0.2)") @@ -305,11 +307,12 @@ def test_notify_query_notify(conn_cls, dsn, sleep_on): sleep(0.2) for n in conn.notifies(timeout=0.2): - by_gen.append(int(n.payload)) + if listen_by == "generator": + notifies.append(int(n.payload)) workers.append(spawn(listener)) e.wait() workers.append(spawn(notifier)) gather(*workers) - assert list(range(3)) == by_cb == by_gen, f"by_gen={by_gen!r}, by_cb={by_cb!r}" + assert notifies == list(range(3)) diff --git a/tests/test_notify_async.py b/tests/test_notify_async.py index ee0d3c06b..404e6a8e3 100644 --- a/tests/test_notify_async.py +++ b/tests/test_notify_async.py @@ -273,10 +273,10 @@ async def test_first_notify_not_lost(aconn, aconn_cls, dsn, query_between): @pytest.mark.slow @pytest.mark.timing @pytest.mark.parametrize("sleep_on", ["server", "client"]) -async def test_notify_query_notify(aconn_cls, dsn, sleep_on): +@pytest.mark.parametrize("listen_by", ["callback", "generator"]) +async def test_notify_query_notify(aconn_cls, dsn, sleep_on, listen_by): e = AEvent() - by_gen: list[int] = [] - by_cb: list[int] = [] + notifies: list[int] = [] workers = [] async def notifier(): @@ -288,12 +288,14 @@ async def test_notify_query_notify(aconn_cls, dsn, sleep_on): async def listener(): async with await aconn_cls.connect(dsn, autocommit=True) as aconn: - aconn.add_notify_handler(lambda n: by_cb.append(int(n.payload))) + if listen_by == "callback": + aconn.add_notify_handler(lambda n: notifies.append(int(n.payload))) await aconn.execute("listen counter") e.set() async for n in aconn.notifies(timeout=0.2): - by_gen.append(int(n.payload)) + if listen_by == "generator": + notifies.append(int(n.payload)) if sleep_on == "server": await aconn.execute("select pg_sleep(0.2)") @@ -302,11 +304,12 @@ async def test_notify_query_notify(aconn_cls, dsn, sleep_on): await asleep(0.2) async for n in aconn.notifies(timeout=0.2): - by_gen.append(int(n.payload)) + if listen_by == "generator": + notifies.append(int(n.payload)) workers.append(spawn(listener)) await e.wait() workers.append(spawn(notifier)) await gather(*workers) - assert list(range(3)) == by_cb == by_gen, f"{by_gen=}, {by_cb=}" + assert notifies == list(range(3))