]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: raise a warning when notifies generator and handlers are used together
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 6 Sep 2025 22:14:48 +0000 (00:14 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 6 Sep 2025 23:53:50 +0000 (01:53 +0200)
.flake8
psycopg/psycopg/connection.py
psycopg/psycopg/connection_async.py
tests/test_notify.py
tests/test_notify_async.py

diff --git a/.flake8 b/.flake8
index 79e029388048c5ca75725c6bd2ad3353bacc472f..141e4da14d672e6ce64da579a66d9f0e2e4d2beb 100644 (file)
--- a/.flake8
+++ b/.flake8
@@ -12,6 +12,7 @@ per-file-ignores =
     psycopg/psycopg/errors.py: E125, E128, E302
 
     # Allow concatenated string literals from async_to_sync
+    psycopg/psycopg/connection.py
     psycopg_pool/psycopg_pool/pool.py: E501
 
     # Pytest's importorskip() getting in the way
index 745715673168bfb790ffe8112a44163b7b9b6807..00d23d6ecaf12072f612dd7a1c7325f6f4332c77 100644 (file)
@@ -10,6 +10,7 @@ Psycopg connection object (sync version)
 from __future__ import annotations
 
 import logging
+import warnings
 from time import monotonic
 from types import TracebackType
 from typing import TYPE_CHECKING, Any, cast, overload
@@ -343,6 +344,12 @@ class Connection(BaseConnection[Row]):
 
         nreceived = 0
 
+        if self._notify_handlers:
+            warnings.warn(
+                "using 'notifies()' together with notifies handlers on the same connection is not reliable. Please use only one of thees methods",
+                RuntimeWarning,
+            )
+
         with self.lock:
             enc = self.pgconn._encoding
 
index 037750efca2e819d63134e40b793c56904227d69..dabdab036f7ec1b589becdbb784033aee61f2bb2 100644 (file)
@@ -7,6 +7,7 @@ Psycopg connection object (async version)
 from __future__ import annotations
 
 import logging
+import warnings
 from time import monotonic
 from types import TracebackType
 from typing import TYPE_CHECKING, Any, cast, overload
@@ -363,6 +364,14 @@ class AsyncConnection(BaseConnection[Row]):
 
         nreceived = 0
 
+        if self._notify_handlers:
+            warnings.warn(
+                "using 'notifies()' together with notifies handlers on the"
+                " same connection is not reliable."
+                " Please use only one of thees methods",
+                RuntimeWarning,
+            )
+
         async with self.lock:
             enc = self.pgconn._encoding
 
index 99c99e698dc7ebc2e747131bbc13be9ae57cb742..fde74058dc76bab153b1b05f4b7dcce05487c3f0 100644 (file)
@@ -223,7 +223,13 @@ def test_notifies_blocking(conn):
 
 
 @pytest.mark.slow
-def test_generator_and_handler(conn, conn_cls, dsn):
+def test_generator_and_handler(conn, conn_cls, dsn, recwarn):
+    # NOTE: we don't support generator+handlers anymore. So, if in the future
+    # this behaviour will change, we will not consider it a regression. However
+    # we will want to keep the warning check.
+
+    recwarn.clear()
+
     conn.set_autocommit(True)
     conn.execute("listen foo")
 
@@ -255,6 +261,9 @@ def test_generator_and_handler(conn, conn_cls, dsn):
     assert n1
     assert n2
 
+    msg = str(recwarn.pop(RuntimeWarning).message)
+    assert "notifies()" in msg
+
 
 @pytest.mark.parametrize("query_between", [True, False])
 def test_first_notify_not_lost(conn, conn_cls, dsn, query_between):
@@ -289,25 +298,41 @@ def test_notify_query_notify(conn_cls, dsn, sleep_on, listen_by):
                 conn.execute("select pg_notify('counter', %s)", (str(i),))
                 sleep(0.2)
 
-    def listener():
-        with conn_cls.connect(dsn, autocommit=True) as conn:
-            if listen_by == "callback":
+    def nap(conn):
+        if sleep_on == "server":
+            conn.execute("select pg_sleep(0.2)")
+        else:
+            assert sleep_on == "client"
+            sleep(0.2)
+
+    if listen_by == "callback":
+
+        def listener():
+            with conn_cls.connect(dsn, autocommit=True) as conn:
                 conn.add_notify_handler(lambda n: notifies.append(int(n.payload)))
 
-            conn.execute("listen counter")
-            e.set()
-            for n in conn.notifies(timeout=0.2):
-                if listen_by == "generator":
+                conn.execute("listen counter")
+                e.set()
+
+                nap(conn)
+                conn.execute("")
+                nap(conn)
+                conn.execute("")
+                nap(conn)
+                conn.execute("")
+
+    else:
+
+        def listener():
+            with conn_cls.connect(dsn, autocommit=True) as conn:
+                conn.execute("listen counter")
+                e.set()
+                for n in conn.notifies(timeout=0.2):
                     notifies.append(int(n.payload))
 
-            if sleep_on == "server":
-                conn.execute("select pg_sleep(0.2)")
-            else:
-                assert sleep_on == "client"
-                sleep(0.2)
+                nap(conn)
 
-            for n in conn.notifies(timeout=0.2):
-                if listen_by == "generator":
+                for n in conn.notifies(timeout=0.2):
                     notifies.append(int(n.payload))
 
     workers.append(spawn(listener))
index 404e6a8e3e87466b33df3c717f9de55ebb0197e5..6faf9be4f1c0ab96ddb360f339857e763618a992 100644 (file)
@@ -219,7 +219,13 @@ async def test_notifies_blocking(aconn):
 
 
 @pytest.mark.slow
-async def test_generator_and_handler(aconn, aconn_cls, dsn):
+async def test_generator_and_handler(aconn, aconn_cls, dsn, recwarn):
+    # NOTE: we don't support generator+handlers anymore. So, if in the future
+    # this behaviour will change, we will not consider it a regression. However
+    # we will want to keep the warning check.
+
+    recwarn.clear()
+
     await aconn.set_autocommit(True)
     await aconn.execute("listen foo")
 
@@ -252,6 +258,9 @@ async def test_generator_and_handler(aconn, aconn_cls, dsn):
     assert n1
     assert n2
 
+    msg = str(recwarn.pop(RuntimeWarning).message)
+    assert "notifies()" in msg
+
 
 @pytest.mark.parametrize("query_between", [True, False])
 async def test_first_notify_not_lost(aconn, aconn_cls, dsn, query_between):
@@ -286,25 +295,41 @@ async def test_notify_query_notify(aconn_cls, dsn, sleep_on, listen_by):
                 await aconn.execute("select pg_notify('counter', %s)", (str(i),))
                 await asleep(0.2)
 
-    async def listener():
-        async with await aconn_cls.connect(dsn, autocommit=True) as aconn:
-            if listen_by == "callback":
+    async def nap(aconn):
+        if sleep_on == "server":
+            await aconn.execute("select pg_sleep(0.2)")
+        else:
+            assert sleep_on == "client"
+            await asleep(0.2)
+
+    if listen_by == "callback":
+
+        async def listener():
+            async with await aconn_cls.connect(dsn, autocommit=True) as aconn:
                 aconn.add_notify_handler(lambda n: notifies.append(int(n.payload)))
 
-            await aconn.execute("listen counter")
-            e.set()
-            async for n in aconn.notifies(timeout=0.2):
-                if listen_by == "generator":
+                await aconn.execute("listen counter")
+                e.set()
+
+                await nap(aconn)
+                await aconn.execute("")
+                await nap(aconn)
+                await aconn.execute("")
+                await nap(aconn)
+                await aconn.execute("")
+
+    else:
+
+        async def listener():
+            async with await aconn_cls.connect(dsn, autocommit=True) as aconn:
+                await aconn.execute("listen counter")
+                e.set()
+                async for n in aconn.notifies(timeout=0.2):
                     notifies.append(int(n.payload))
 
-            if sleep_on == "server":
-                await aconn.execute("select pg_sleep(0.2)")
-            else:
-                assert sleep_on == "client"
-                await asleep(0.2)
+                await nap(aconn)
 
-            async for n in aconn.notifies(timeout=0.2):
-                if listen_by == "generator":
+                async for n in aconn.notifies(timeout=0.2):
                     notifies.append(int(n.payload))
 
     workers.append(spawn(listener))