self._prepared: PrepareManager = PrepareManager()
self._tpc: tuple[Xid, bool] | None = None # xid, prepared
+ # Gather notifies when the notifies() generator is not running.
+ # It will be set to None during `notifies()` generator run.
+ self._notifies_backlog: deque[Notify] | None = deque()
+
wself = ref(self)
pgconn.notice_handler = partial(BaseConnection._notice_handler, wself)
pgconn.notify_handler = partial(BaseConnection._notify_handler, wself)
- # Gather notifies when the notifies() generator is not running.
- self._notifies_backlog = deque[Notify]()
- self._notifies_backlog_handler = partial(
- BaseConnection._add_notify_to_backlog, wself
- )
- self.add_notify_handler(self._notifies_backlog_handler)
-
# Attribute is only set if the connection is from a pool so we can tell
# apart a connection in the pool too (when _pool = None)
self._pool: BasePool | None
def _notify_handler(
wself: ReferenceType[BaseConnection[Row]], pgn: pq.PGnotify
) -> None:
- self = wself()
- if not (self and self._notify_handlers):
+ if not (self := wself()):
return
enc = self.pgconn._encoding
n = Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid)
+
+ # `_notifies_backlog` is None if the `notifies()` generator is running
+ if (d := self._notifies_backlog) is not None:
+ d.append(n)
+
for cb in self._notify_handlers:
cb(n)
- @staticmethod
- def _add_notify_to_backlog(
- wself: ReferenceType[BaseConnection[Row]], notify: Notify
- ) -> None:
- self = wself()
- if not self or self._notifies_backlog is None:
- return
- self._notifies_backlog.append(notify)
-
@property
def prepare_threshold(self) -> int | None:
"""
with self.lock:
enc = self.pgconn._encoding
- # Remove the handler for the duration of this critical section to
- # avoid reporting notifies twice.
- self.remove_notify_handler(self._notifies_backlog_handler)
+ # Remove the backlog deque for the duration of this critical
+ # section to avoid reporting notifies twice.
+ self._notifies_backlog, d = (None, self._notifies_backlog)
try:
while True:
# if notifies were received when the generator was off,
# return them in a first batch.
- if self._notifies_backlog:
- while self._notifies_backlog:
- yield self._notifies_backlog.popleft()
+ if d:
+ while d:
+ yield d.popleft()
nreceived += 1
else:
try:
if interval < 0.0:
break
finally:
- self.add_notify_handler(self._notifies_backlog_handler)
+ self._notifies_backlog = d
@contextmanager
def pipeline(self) -> Iterator[Pipeline]:
async with self.lock:
enc = self.pgconn._encoding
- # Remove the handler for the duration of this critical section to
- # avoid reporting notifies twice.
- self.remove_notify_handler(self._notifies_backlog_handler)
+ # Remove the backlog deque for the duration of this critical
+ # section to avoid reporting notifies twice.
+ self._notifies_backlog, d = None, self._notifies_backlog
try:
while True:
# if notifies were received when the generator was off,
# return them in a first batch.
- if self._notifies_backlog:
- while self._notifies_backlog:
- yield self._notifies_backlog.popleft()
+ if d:
+ while d:
+ yield d.popleft()
nreceived += 1
else:
try:
if interval < 0.0:
break
finally:
- self.add_notify_handler(self._notifies_backlog_handler)
+ self._notifies_backlog = d
@asynccontextmanager
async def pipeline(self) -> AsyncIterator[AsyncPipeline]: