enc = self.pgconn._encoding
n = Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid)
- # `_notifies_backlog` is None if the `notifies()` generator is running
- if (d := self._notifies_backlog) is not None:
- d.append(n)
-
- for cb in self._notify_handlers:
- cb(n)
+ if self._notify_handlers:
+ for cb in self._notify_handlers:
+ cb(n)
+ else:
+ # `_notifies_backlog` is None if the `notifies()` generator is running
+ if (d := self._notifies_backlog) is not None:
+ d.append(n)
@property
def prepare_threshold(self) -> int | None:
@pytest.mark.slow
@pytest.mark.timing
@pytest.mark.parametrize("sleep_on", ["server", "client"])
-def test_notify_query_notify(conn_cls, dsn, sleep_on):
+@pytest.mark.parametrize("listen_by", ["callback", "generator"])
+def test_notify_query_notify(conn_cls, dsn, sleep_on, listen_by):
e = Event()
- by_gen: list[int] = []
- by_cb: list[int] = []
+ notifies: list[int] = []
workers = []
def notifier():
def listener():
with conn_cls.connect(dsn, autocommit=True) as conn:
- conn.add_notify_handler(lambda n: by_cb.append(int(n.payload)))
+ if listen_by == "callback":
+ conn.add_notify_handler(lambda n: notifies.append(int(n.payload)))
conn.execute("listen counter")
e.set()
for n in conn.notifies(timeout=0.2):
- by_gen.append(int(n.payload))
+ if listen_by == "generator":
+ notifies.append(int(n.payload))
if sleep_on == "server":
conn.execute("select pg_sleep(0.2)")
sleep(0.2)
for n in conn.notifies(timeout=0.2):
- by_gen.append(int(n.payload))
+ if listen_by == "generator":
+ notifies.append(int(n.payload))
workers.append(spawn(listener))
e.wait()
workers.append(spawn(notifier))
gather(*workers)
- assert list(range(3)) == by_cb == by_gen, f"by_gen={by_gen!r}, by_cb={by_cb!r}"
+ assert notifies == list(range(3))
@pytest.mark.slow
@pytest.mark.timing
@pytest.mark.parametrize("sleep_on", ["server", "client"])
-async def test_notify_query_notify(aconn_cls, dsn, sleep_on):
+@pytest.mark.parametrize("listen_by", ["callback", "generator"])
+async def test_notify_query_notify(aconn_cls, dsn, sleep_on, listen_by):
e = AEvent()
- by_gen: list[int] = []
- by_cb: list[int] = []
+ notifies: list[int] = []
workers = []
async def notifier():
async def listener():
async with await aconn_cls.connect(dsn, autocommit=True) as aconn:
- aconn.add_notify_handler(lambda n: by_cb.append(int(n.payload)))
+ if listen_by == "callback":
+ aconn.add_notify_handler(lambda n: notifies.append(int(n.payload)))
await aconn.execute("listen counter")
e.set()
async for n in aconn.notifies(timeout=0.2):
- by_gen.append(int(n.payload))
+ if listen_by == "generator":
+ notifies.append(int(n.payload))
if sleep_on == "server":
await aconn.execute("select pg_sleep(0.2)")
await asleep(0.2)
async for n in aconn.notifies(timeout=0.2):
- by_gen.append(int(n.payload))
+ if listen_by == "generator":
+ notifies.append(int(n.payload))
workers.append(spawn(listener))
await e.wait()
workers.append(spawn(notifier))
await gather(*workers)
- assert list(range(3)) == by_cb == by_gen, f"{by_gen=}, {by_cb=}"
+ assert notifies == list(range(3))