if self._pgconn.pipeline_status:
raise e.ProgrammingError("stream() cannot be used in pipeline mode")
- try:
- with self._conn.lock:
+ with self._conn.lock:
+
+ try:
self._conn.wait(self._stream_send_gen(query, params, binary=binary))
first = True
while self._conn.wait(self._stream_fetchone_gen(first)):
yield rec
first = False
- except e.Error as ex:
- raise ex.with_traceback(None)
+ except e.Error as ex:
+ raise ex.with_traceback(None)
+
+ finally:
+ if self._pgconn.transaction_status == ACTIVE:
+ # Try to cancel the query, then consume the results
+ # already received.
+ self._conn.cancel()
+ try:
+ while self._conn.wait(self._stream_fetchone_gen(first=False)):
+ pass
+ except Exception:
+ pass
- finally:
- if self._pgconn.transaction_status == ACTIVE:
- # Try to cancel the query, then consume the results already received.
- self._conn.cancel()
- try:
- while self._conn.wait(self._stream_fetchone_gen(first=False)):
+ # Try to get out of ACTIVE state. Just do a single attempt, which
+ # should work to recover from an error or query cancelled.
+ try:
+ self._conn.wait(self._stream_fetchone_gen(first=False))
+ except Exception:
pass
- except Exception:
- pass
-
- # Try to get out of ACTIVE state. Just do a single attempt, which
- # should work to recover from an error or query cancelled.
- try:
- self._conn.wait(self._stream_fetchone_gen(first=False))
- except Exception:
- pass
def fetchone(self) -> Optional[Row]:
"""
if self._pgconn.pipeline_status:
raise e.ProgrammingError("stream() cannot be used in pipeline mode")
- try:
- async with self._conn.lock:
+ async with self._conn.lock:
+
+ try:
await self._conn.wait(
self._stream_send_gen(query, params, binary=binary)
)
yield rec
first = False
- except e.Error as ex:
- raise ex.with_traceback(None)
+ except e.Error as ex:
+ raise ex.with_traceback(None)
+
+ finally:
+ if self._pgconn.transaction_status == ACTIVE:
+ # Try to cancel the query, then consume the results
+ # already received.
+ self._conn.cancel()
+ try:
+ while await self._conn.wait(
+ self._stream_fetchone_gen(first=False)
+ ):
+ pass
+ except Exception:
+ pass
- finally:
- if self._pgconn.transaction_status == ACTIVE:
- # Try to cancel the query, then consume the results already received.
- self._conn.cancel()
- try:
- while await self._conn.wait(self._stream_fetchone_gen(first=False)):
+ # Try to get out of ACTIVE state. Just do a single attempt, which
+ # should work to recover from an error or query cancelled.
+ try:
+ await self._conn.wait(self._stream_fetchone_gen(first=False))
+ except Exception:
pass
- except Exception:
- pass
-
- # Try to get out of ACTIVE state. Just do a single attempt, which
- # should work to recover from an error or query cancelled.
- try:
- await self._conn.wait(self._stream_fetchone_gen(first=False))
- except Exception:
- pass
async def fetchone(self) -> Optional[Row]:
await self._fetch_pipeline()