@contextmanager
def pipeline(self) -> Iterator[None]:
"""Context manager to switch the connection into pipeline mode."""
- if self._pipeline is not None:
- # calling pipeline recursively is no-op.
+ with self.lock:
+ if self._pipeline is None:
+ # We must enter pipeline mode: create a new one
+ pipeline = self._pipeline = Pipeline(self.pgconn)
+ else:
+ # we are already in pipeline mode: bail out as soon as we
+ # leave the lock block.
+ pipeline = None
+
+ if not pipeline:
+ # No-op re-entered inner pipeline block.
yield
return
- pipeline = self._pipeline = Pipeline(self.pgconn)
try:
with pipeline:
try:
@asynccontextmanager
async def pipeline(self) -> AsyncIterator[None]:
"""Context manager to switch the connection into pipeline mode."""
- if self._pipeline is not None:
- # calling pipeline recursively is no-op.
+ async with self.lock:
+ if self._pipeline is None:
+ # We must enter pipeline mode: create a new one
+ pipeline = self._pipeline = AsyncPipeline(self.pgconn)
+ else:
+ # we are already in pipeline mode: bail out as soon as we
+ # leave the lock block.
+ pipeline = None
+
+ if not pipeline:
+ # No-op re-entered inner pipeline block.
yield
return
- pipeline = self._pipeline = AsyncPipeline(self.pgconn)
try:
async with pipeline:
try: