elif status == PIPELINE_ABORTED:
raise e.PipelineAborted("pipeline aborted")
else:
- raise e.ProgrammingError("the last operation didn't produce a result")
+ if res.command_status:
+ detail = f" (command status: {res.command_status.decode()})"
+ else:
+ try:
+ status_name = pq.ExecStatus(status).name
+ except ValueError:
+ status_name = f"{status} - unknown"
+ detail = f" (result status: {status_name})"
+ raise e.ProgrammingError(
+ f"the last operation didn't produce records{detail}"
+ )
def _check_copy_result(self, result: PGresult) -> None:
"""
def test_typeinfo(conn):
info = TypeInfo.fetch(conn, "jsonb")
assert info is not None
+
+
+def test_error_no_result(conn):
+ cur = conn.cursor()
+ with pytest.raises(psycopg.ProgrammingError, match="no result available"):
+ cur.fetchone()
+
+ cur.execute("set timezone to utc")
+ with pytest.raises(
+ psycopg.ProgrammingError, match="last operation.*command status: SET"
+ ):
+ cur.fetchone()
+
+ cur.execute("")
+ with pytest.raises(
+ psycopg.ProgrammingError, match="last operation.*result status: EMPTY_QUERY"
+ ):
+ cur.fetchone()
async def test_typeinfo(aconn):
info = await TypeInfo.fetch(aconn, "jsonb")
assert info is not None
+
+
+async def test_error_no_result(aconn):
+ cur = aconn.cursor()
+ with pytest.raises(psycopg.ProgrammingError, match="no result available"):
+ await cur.fetchone()
+
+ await cur.execute("set timezone to utc")
+ with pytest.raises(
+ psycopg.ProgrammingError, match="last operation.*command status: SET"
+ ):
+ await cur.fetchone()
+
+ await cur.execute("")
+ with pytest.raises(
+ psycopg.ProgrammingError, match="last operation.*result status: EMPTY_QUERY"
+ ):
+ await cur.fetchone()