Psycopg 3.2.4 (unreleased)
^^^^^^^^^^^^^^^^^^^^^^^^^^
-- Don't lose notifies received between two `~Connection.notifies()` calls
- (:ticket:`#962`).
+- Don't lose notifies received whilst the `~Connection.notifies()` iterator
+ is not running (:ticket:`#962`).
- Make sure that the notifies callback is called during the use of the
`~Connection.notifies()` generator (:ticket:`#972`).
pgconn.notify_handler = partial(BaseConnection._notify_handler, wself)
# Gather notifies when the notifies() generator is not running.
- # This handler is registered after notifies() is used te first time.
- # backlog = None means that the handler hasn't been registered.
- self._notifies_backlog: Deque[Notify] | None = None
+ self._notifies_backlog = Deque[Notify]()
self._notifies_backlog_handler = partial(
BaseConnection._add_notify_to_backlog, wself
)
+ self.add_notify_handler(self._notifies_backlog_handler)
# Attribute is only set if the connection is from a pool so we can tell
# apart a connection in the pool too (when _pool = None)
from .rows import Row, RowFactory, tuple_row, args_row
from .adapt import AdaptersMap
from ._enums import IsolationLevel
-from ._compat import Deque, Self
+from ._compat import Self
from .conninfo import make_conninfo, conninfo_to_dict
from .conninfo import conninfo_attempts, timeout_from_conninfo
from ._pipeline import Pipeline
with self.lock:
enc = self.pgconn._encoding
- # If the backlog is set to not-None, then the handler is also set.
# Remove the handler for the duration of this critical section to
# avoid reporting notifies twice.
- if self._notifies_backlog is not None:
- self.remove_notify_handler(self._notifies_backlog_handler)
+ self.remove_notify_handler(self._notifies_backlog_handler)
try:
while True:
if interval < 0.0:
break
finally:
- # Install, or re-install, the backlog notify handler
- # to catch notifications received while the generator was off.
- if self._notifies_backlog is None:
- self._notifies_backlog = Deque()
self.add_notify_handler(self._notifies_backlog_handler)
@contextmanager
from .rows import Row, AsyncRowFactory, tuple_row, args_row
from .adapt import AdaptersMap
from ._enums import IsolationLevel
-from ._compat import Deque, Self
+from ._compat import Self
from .conninfo import make_conninfo, conninfo_to_dict
from .conninfo import conninfo_attempts_async, timeout_from_conninfo
from ._pipeline import AsyncPipeline
async with self.lock:
enc = self.pgconn._encoding
- # If the backlog is set to not-None, then the handler is also set.
# Remove the handler for the duration of this critical section to
# avoid reporting notifies twice.
- if self._notifies_backlog is not None:
- self.remove_notify_handler(self._notifies_backlog_handler)
+ self.remove_notify_handler(self._notifies_backlog_handler)
try:
while True:
if interval < 0.0:
break
finally:
- # Install, or re-install, the backlog notify handler
- # to catch notifications received while the generator was off.
- if self._notifies_backlog is None:
- self._notifies_backlog = Deque()
-
self.add_notify_handler(self._notifies_backlog_handler)
@asynccontextmanager
assert n2
+@pytest.mark.parametrize("query_between", [True, False])
+def test_first_notify_not_lost(conn, conn_cls, dsn, query_between):
+ conn.set_autocommit(True)
+ conn.execute("listen foo")
+
+ with conn_cls.connect(dsn, autocommit=True) as conn2:
+ conn2.execute("notify foo, 'hi'")
+
+ if query_between:
+ conn.execute("select 1")
+
+ n = None
+ for n in conn.notifies(timeout=1, stop_after=1):
+ pass
+ assert n
+
+
@pytest.mark.slow
@pytest.mark.timing
@pytest.mark.parametrize("sleep_on", ["server", "client"])
assert n2
+@pytest.mark.parametrize("query_between", [True, False])
+async def test_first_notify_not_lost(aconn, aconn_cls, dsn, query_between):
+ await aconn.set_autocommit(True)
+ await aconn.execute("listen foo")
+
+ async with await aconn_cls.connect(dsn, autocommit=True) as conn2:
+ await conn2.execute("notify foo, 'hi'")
+
+ if query_between:
+ await aconn.execute("select 1")
+
+ n = None
+ async for n in aconn.notifies(timeout=1, stop_after=1):
+ pass
+ assert n
+
+
@pytest.mark.slow
@pytest.mark.timing
@pytest.mark.parametrize("sleep_on", ["server", "client"])