results, which are then processed.
"""
fetched = yield from pipeline_communicate(self.pgconn, self.command_queue)
- to_process = [(self.result_queue.popleft(), results) for results in fetched]
- for queued, results in to_process:
- self._process_results(queued, results)
+ exception = None
+ for results in fetched:
+ queued = self.result_queue.popleft()
+ try:
+ self._process_results(queued, results)
+ except e.Error as exc:
+ if exception is None:
+ exception = exc
+ if exception is not None:
+ raise exception
def _fetch_gen(self, *, flush: bool) -> PQGen[None]:
"""Fetch available results from the connection and process them with
self.pgconn.send_flush_request()
yield from send(self.pgconn)
- to_process = []
+ exception = None
while self.result_queue:
results = yield from fetch_many(self.pgconn)
if not results:
# commands.
break
queued = self.result_queue.popleft()
- to_process.append((queued, results))
-
- for queued, results in to_process:
- self._process_results(queued, results)
+ try:
+ self._process_results(queued, results)
+ except e.Error as exc:
+ if exception is None:
+ exception = exc
+ if exception is not None:
+ raise exception
def _process_results(
self, queued: PendingResult, results: List["PGresult"]
raise e.PipelineAborted("pipeline aborted")
else:
cursor, prepinfo = queued
- cursor._set_results_from_pipeline(results)
if prepinfo:
key, prep, name = prepinfo
# Update the prepare state of the query.
cursor._conn._prepared.validate(key, prep, name, results)
+ cursor._set_results_from_pipeline(results)
def _enqueue_sync(self) -> None:
"""Enqueue a PQpipelineSync() command."""
assert res == [0] * 5 + [1] * 5
+def test_prepare_error(conn):
+ """Regression test for GH issue #585.
+
+ An invalid prepared statement, in a pipeline, should be discarded at exit
+ and not reused.
+ """
+ conn.autocommit = True
+ stmt = "INSERT INTO nosuchtable(data) VALUES (%s)"
+ with pytest.raises(psycopg.errors.UndefinedTable):
+ with conn.pipeline():
+ conn.execute(stmt, ["foo"], prepare=True)
+ assert not conn._prepared._names
+ with pytest.raises(psycopg.errors.UndefinedTable):
+ conn.execute(stmt, ["bar"])
+
+
def test_transaction(conn):
notices = []
conn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
assert res == [0] * 5 + [1] * 5
+async def test_prepare_error(aconn):
+ """Regression test for GH issue #585.
+
+ An invalid prepared statement, in a pipeline, should be discarded at exit
+ and not reused.
+ """
+ await aconn.set_autocommit(True)
+ stmt = "INSERT INTO nosuchtable(data) VALUES (%s)"
+ with pytest.raises(psycopg.errors.UndefinedTable):
+ async with aconn.pipeline():
+ await aconn.execute(stmt, ["foo"], prepare=True)
+ assert not aconn._prepared._names
+ with pytest.raises(psycopg.errors.UndefinedTable):
+ await aconn.execute(stmt, ["bar"])
+
+
async def test_transaction(aconn):
notices = []
aconn.add_notice_handler(lambda diag: notices.append(diag.message_primary))