]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: collect notifies only if no handler was registered fix-1091 1092/head
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 14 May 2025 15:16:03 +0000 (17:16 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 14 May 2025 15:20:31 +0000 (17:20 +0200)
If someone is listening to notifications by using an handler, the
notifies backlog would fill without ever being emptied.

This change has the risk of breaking something if someone is relying on
notifies being received both via callback and via generator, but I don't
know how to satisfy it without creating a leak to users who don't use the
generator. Will ask around...

Fix #1091.

psycopg/psycopg/_connection_base.py
tests/test_notify.py
tests/test_notify_async.py

index 7ac6af1a5ccb233ff7393a7225947c744e033905..59dd17fd822bd2328c118652bc4c67e5d1e5be96 100644 (file)
@@ -376,12 +376,13 @@ class BaseConnection(Generic[Row]):
         enc = self.pgconn._encoding
         n = Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid)
 
-        # `_notifies_backlog` is None if the `notifies()` generator is running
-        if (d := self._notifies_backlog) is not None:
-            d.append(n)
-
-        for cb in self._notify_handlers:
-            cb(n)
+        if self._notify_handlers:
+            for cb in self._notify_handlers:
+                cb(n)
+        else:
+            # `_notifies_backlog` is None if the `notifies()` generator is running
+            if (d := self._notifies_backlog) is not None:
+                d.append(n)
 
     @property
     def prepare_threshold(self) -> int | None:
index e811f0e71a81804f1cbb8dc727be0e9510f460c5..99c99e698dc7ebc2e747131bbc13be9ae57cb742 100644 (file)
@@ -276,10 +276,10 @@ def test_first_notify_not_lost(conn, conn_cls, dsn, query_between):
 @pytest.mark.slow
 @pytest.mark.timing
 @pytest.mark.parametrize("sleep_on", ["server", "client"])
-def test_notify_query_notify(conn_cls, dsn, sleep_on):
+@pytest.mark.parametrize("listen_by", ["callback", "generator"])
+def test_notify_query_notify(conn_cls, dsn, sleep_on, listen_by):
     e = Event()
-    by_gen: list[int] = []
-    by_cb: list[int] = []
+    notifies: list[int] = []
     workers = []
 
     def notifier():
@@ -291,12 +291,14 @@ def test_notify_query_notify(conn_cls, dsn, sleep_on):
 
     def listener():
         with conn_cls.connect(dsn, autocommit=True) as conn:
-            conn.add_notify_handler(lambda n: by_cb.append(int(n.payload)))
+            if listen_by == "callback":
+                conn.add_notify_handler(lambda n: notifies.append(int(n.payload)))
 
             conn.execute("listen counter")
             e.set()
             for n in conn.notifies(timeout=0.2):
-                by_gen.append(int(n.payload))
+                if listen_by == "generator":
+                    notifies.append(int(n.payload))
 
             if sleep_on == "server":
                 conn.execute("select pg_sleep(0.2)")
@@ -305,11 +307,12 @@ def test_notify_query_notify(conn_cls, dsn, sleep_on):
                 sleep(0.2)
 
             for n in conn.notifies(timeout=0.2):
-                by_gen.append(int(n.payload))
+                if listen_by == "generator":
+                    notifies.append(int(n.payload))
 
     workers.append(spawn(listener))
     e.wait()
     workers.append(spawn(notifier))
     gather(*workers)
 
-    assert list(range(3)) == by_cb == by_gen, f"by_gen={by_gen!r}, by_cb={by_cb!r}"
+    assert notifies == list(range(3))
index ee0d3c06b972d2d1b0a0547cdf0c5d63960676e1..404e6a8e3e87466b33df3c717f9de55ebb0197e5 100644 (file)
@@ -273,10 +273,10 @@ async def test_first_notify_not_lost(aconn, aconn_cls, dsn, query_between):
 @pytest.mark.slow
 @pytest.mark.timing
 @pytest.mark.parametrize("sleep_on", ["server", "client"])
-async def test_notify_query_notify(aconn_cls, dsn, sleep_on):
+@pytest.mark.parametrize("listen_by", ["callback", "generator"])
+async def test_notify_query_notify(aconn_cls, dsn, sleep_on, listen_by):
     e = AEvent()
-    by_gen: list[int] = []
-    by_cb: list[int] = []
+    notifies: list[int] = []
     workers = []
 
     async def notifier():
@@ -288,12 +288,14 @@ async def test_notify_query_notify(aconn_cls, dsn, sleep_on):
 
     async def listener():
         async with await aconn_cls.connect(dsn, autocommit=True) as aconn:
-            aconn.add_notify_handler(lambda n: by_cb.append(int(n.payload)))
+            if listen_by == "callback":
+                aconn.add_notify_handler(lambda n: notifies.append(int(n.payload)))
 
             await aconn.execute("listen counter")
             e.set()
             async for n in aconn.notifies(timeout=0.2):
-                by_gen.append(int(n.payload))
+                if listen_by == "generator":
+                    notifies.append(int(n.payload))
 
             if sleep_on == "server":
                 await aconn.execute("select pg_sleep(0.2)")
@@ -302,11 +304,12 @@ async def test_notify_query_notify(aconn_cls, dsn, sleep_on):
                 await asleep(0.2)
 
             async for n in aconn.notifies(timeout=0.2):
-                by_gen.append(int(n.payload))
+                if listen_by == "generator":
+                    notifies.append(int(n.payload))
 
     workers.append(spawn(listener))
     await e.wait()
     workers.append(spawn(notifier))
     await gather(*workers)
 
-    assert list(range(3)) == by_cb == by_gen, f"{by_gen=}, {by_cb=}"
+    assert notifies == list(range(3))