From eac8115523d1b569e279eaec5fc31d76da25ef55 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sun, 7 Sep 2025 00:14:48 +0200 Subject: [PATCH] fix: raise a warning when notifies generator and handlers are used together --- psycopg/psycopg/connection.py | 6 ++++ psycopg/psycopg/connection_async.py | 8 +++++ tests/test_notify.py | 55 +++++++++++++++++++++-------- tests/test_notify_async.py | 55 +++++++++++++++++++++-------- 4 files changed, 94 insertions(+), 30 deletions(-) diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index 2a55f0485..f6af3edc3 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -354,6 +354,12 @@ class Connection(BaseConnection[Row]): nreceived = 0 + if self._notify_handlers: + warnings.warn( + "using 'notifies()' together with notifies handlers on the same connection is not reliable. Please use only one of thees methods", + RuntimeWarning, + ) + with self.lock: enc = self.pgconn._encoding diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index cc9bd7247..28e369191 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -383,6 +383,14 @@ class AsyncConnection(BaseConnection[Row]): nreceived = 0 + if self._notify_handlers: + warnings.warn( + "using 'notifies()' together with notifies handlers on the" + " same connection is not reliable." + " Please use only one of thees methods", + RuntimeWarning, + ) + async with self.lock: enc = self.pgconn._encoding diff --git a/tests/test_notify.py b/tests/test_notify.py index 99c99e698..fde74058d 100644 --- a/tests/test_notify.py +++ b/tests/test_notify.py @@ -223,7 +223,13 @@ def test_notifies_blocking(conn): @pytest.mark.slow -def test_generator_and_handler(conn, conn_cls, dsn): +def test_generator_and_handler(conn, conn_cls, dsn, recwarn): + # NOTE: we don't support generator+handlers anymore. So, if in the future + # this behaviour will change, we will not consider it a regression. However + # we will want to keep the warning check. + + recwarn.clear() + conn.set_autocommit(True) conn.execute("listen foo") @@ -255,6 +261,9 @@ def test_generator_and_handler(conn, conn_cls, dsn): assert n1 assert n2 + msg = str(recwarn.pop(RuntimeWarning).message) + assert "notifies()" in msg + @pytest.mark.parametrize("query_between", [True, False]) def test_first_notify_not_lost(conn, conn_cls, dsn, query_between): @@ -289,25 +298,41 @@ def test_notify_query_notify(conn_cls, dsn, sleep_on, listen_by): conn.execute("select pg_notify('counter', %s)", (str(i),)) sleep(0.2) - def listener(): - with conn_cls.connect(dsn, autocommit=True) as conn: - if listen_by == "callback": + def nap(conn): + if sleep_on == "server": + conn.execute("select pg_sleep(0.2)") + else: + assert sleep_on == "client" + sleep(0.2) + + if listen_by == "callback": + + def listener(): + with conn_cls.connect(dsn, autocommit=True) as conn: conn.add_notify_handler(lambda n: notifies.append(int(n.payload))) - conn.execute("listen counter") - e.set() - for n in conn.notifies(timeout=0.2): - if listen_by == "generator": + conn.execute("listen counter") + e.set() + + nap(conn) + conn.execute("") + nap(conn) + conn.execute("") + nap(conn) + conn.execute("") + + else: + + def listener(): + with conn_cls.connect(dsn, autocommit=True) as conn: + conn.execute("listen counter") + e.set() + for n in conn.notifies(timeout=0.2): notifies.append(int(n.payload)) - if sleep_on == "server": - conn.execute("select pg_sleep(0.2)") - else: - assert sleep_on == "client" - sleep(0.2) + nap(conn) - for n in conn.notifies(timeout=0.2): - if listen_by == "generator": + for n in conn.notifies(timeout=0.2): notifies.append(int(n.payload)) workers.append(spawn(listener)) diff --git a/tests/test_notify_async.py b/tests/test_notify_async.py index 404e6a8e3..6faf9be4f 100644 --- a/tests/test_notify_async.py +++ b/tests/test_notify_async.py @@ -219,7 +219,13 @@ async def test_notifies_blocking(aconn): @pytest.mark.slow -async def test_generator_and_handler(aconn, aconn_cls, dsn): +async def test_generator_and_handler(aconn, aconn_cls, dsn, recwarn): + # NOTE: we don't support generator+handlers anymore. So, if in the future + # this behaviour will change, we will not consider it a regression. However + # we will want to keep the warning check. + + recwarn.clear() + await aconn.set_autocommit(True) await aconn.execute("listen foo") @@ -252,6 +258,9 @@ async def test_generator_and_handler(aconn, aconn_cls, dsn): assert n1 assert n2 + msg = str(recwarn.pop(RuntimeWarning).message) + assert "notifies()" in msg + @pytest.mark.parametrize("query_between", [True, False]) async def test_first_notify_not_lost(aconn, aconn_cls, dsn, query_between): @@ -286,25 +295,41 @@ async def test_notify_query_notify(aconn_cls, dsn, sleep_on, listen_by): await aconn.execute("select pg_notify('counter', %s)", (str(i),)) await asleep(0.2) - async def listener(): - async with await aconn_cls.connect(dsn, autocommit=True) as aconn: - if listen_by == "callback": + async def nap(aconn): + if sleep_on == "server": + await aconn.execute("select pg_sleep(0.2)") + else: + assert sleep_on == "client" + await asleep(0.2) + + if listen_by == "callback": + + async def listener(): + async with await aconn_cls.connect(dsn, autocommit=True) as aconn: aconn.add_notify_handler(lambda n: notifies.append(int(n.payload))) - await aconn.execute("listen counter") - e.set() - async for n in aconn.notifies(timeout=0.2): - if listen_by == "generator": + await aconn.execute("listen counter") + e.set() + + await nap(aconn) + await aconn.execute("") + await nap(aconn) + await aconn.execute("") + await nap(aconn) + await aconn.execute("") + + else: + + async def listener(): + async with await aconn_cls.connect(dsn, autocommit=True) as aconn: + await aconn.execute("listen counter") + e.set() + async for n in aconn.notifies(timeout=0.2): notifies.append(int(n.payload)) - if sleep_on == "server": - await aconn.execute("select pg_sleep(0.2)") - else: - assert sleep_on == "client" - await asleep(0.2) + await nap(aconn) - async for n in aconn.notifies(timeout=0.2): - if listen_by == "generator": + async for n in aconn.notifies(timeout=0.2): notifies.append(int(n.payload)) workers.append(spawn(listener)) -- 2.47.3