notifications arrives in the same packet.
"""
# Allow interrupting the wait with a signal by reducing a long timeout
- # into shorter interval.
+ # into shorter intervals.
if timeout is not None:
deadline = monotonic() + timeout
interval = min(timeout, _WAIT_INTERVAL)
nreceived = 0
- while True:
- # Collect notifications. Also get the connection encoding if any
- # notification is received to makes sure that they are consistent.
- try:
- with self.lock:
+ with self.lock:
+ enc = self.pgconn._encoding
+ while True:
+ try:
ns = self.wait(notifies(self.pgconn), interval=interval)
- if ns:
- enc = self.pgconn._encoding
- except e._NO_TRACEBACK as ex:
- raise ex.with_traceback(None)
+ except e._NO_TRACEBACK as ex:
+ raise ex.with_traceback(None)
- # Emit the notifications received.
- for pgn in ns:
- n = Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid)
- yield n
- nreceived += 1
-
- # Stop if we have received enough notifications.
- if stop_after is not None and nreceived >= stop_after:
- break
+ # Emit the notifications received.
+ for pgn in ns:
+ n = Notify(
+ pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid
+ )
+ yield n
+ nreceived += 1
- # Check the deadline after the loop to ensure that timeout=0
- # polls at least once.
- if deadline:
- interval = min(_WAIT_INTERVAL, deadline - monotonic())
- if interval < 0.0:
+ # Stop if we have received enough notifications.
+ if stop_after is not None and nreceived >= stop_after:
break
+ # Check the deadline after the loop to ensure that timeout=0
+ # polls at least once.
+ if deadline:
+ interval = min(_WAIT_INTERVAL, deadline - monotonic())
+ if interval < 0.0:
+ break
+
@contextmanager
def pipeline(self) -> Iterator[Pipeline]:
"""Context manager to switch the connection into pipeline mode."""
notifications arrives in the same packet.
"""
# Allow interrupting the wait with a signal by reducing a long timeout
- # into shorter interval.
+ # into shorter intervals.
if timeout is not None:
deadline = monotonic() + timeout
interval = min(timeout, _WAIT_INTERVAL)
nreceived = 0
- while True:
- # Collect notifications. Also get the connection encoding if any
- # notification is received to makes sure that they are consistent.
- try:
- async with self.lock:
+ async with self.lock:
+ enc = self.pgconn._encoding
+ while True:
+ try:
ns = await self.wait(notifies(self.pgconn), interval=interval)
- if ns:
- enc = self.pgconn._encoding
- except e._NO_TRACEBACK as ex:
- raise ex.with_traceback(None)
+ except e._NO_TRACEBACK as ex:
+ raise ex.with_traceback(None)
- # Emit the notifications received.
- for pgn in ns:
- n = Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid)
- yield n
- nreceived += 1
-
- # Stop if we have received enough notifications.
- if stop_after is not None and nreceived >= stop_after:
- break
+ # Emit the notifications received.
+ for pgn in ns:
+ n = Notify(
+ pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid
+ )
+ yield n
+ nreceived += 1
- # Check the deadline after the loop to ensure that timeout=0
- # polls at least once.
- if deadline:
- interval = min(_WAIT_INTERVAL, deadline - monotonic())
- if interval < 0.0:
+ # Stop if we have received enough notifications.
+ if stop_after is not None and nreceived >= stop_after:
break
+ # Check the deadline after the loop to ensure that timeout=0
+ # polls at least once.
+ if deadline:
+ interval = min(_WAIT_INTERVAL, deadline - monotonic())
+ if interval < 0.0:
+ break
+
@asynccontextmanager
async def pipeline(self) -> AsyncIterator[AsyncPipeline]:
"""Context manager to switch the connection into pipeline mode."""
assert ns[1].payload == "2"
finally:
gather(worker)
+
+
+@pytest.mark.slow
+def test_notifies_blocking(conn):
+
+ def listener():
+ for _ in conn.notifies(timeout=1):
+ pass
+
+ worker = spawn(listener)
+ try:
+ # Make sure the listener is listening
+ if not conn.lock.locked():
+ sleep(0.01)
+
+ t0 = time()
+ conn.execute("select 1")
+ dt = time() - t0
+ finally:
+ gather(worker)
+
+ assert dt > 0.5
assert ns[1].payload == "2"
finally:
await gather(worker)
+
+
+@pytest.mark.slow
+async def test_notifies_blocking(aconn):
+ async def listener():
+ async for _ in aconn.notifies(timeout=1):
+ pass
+
+ worker = spawn(listener)
+ try:
+ # Make sure the listener is listening
+ if not aconn.lock.locked():
+ await asleep(0.01)
+
+ t0 = time()
+ await aconn.execute("select 1")
+ dt = time() - t0
+ finally:
+ await gather(worker)
+
+ assert dt > 0.5