in C extension (:ticket:`#1280`).
- Fix client-side adaptation of enums whose name require quotes
(:ticket:`#1298`).
+- Consistently populate `~Cursor.statusmessage` after `~Cursor.executemany()`
+ (:ticket:`#1302`).
Current release
__slots__ = """
_conn format _adapters arraysize _closed _results pgresult _pos
_iresult _rowcount _query _tx _last_query _row_factory _make_row
- _pgconn _execmany_returning
+ _pgconn _execmany_returning _statusmessage
__weakref__
""".split()
self._pos = 0
self._iresult = 0
self._rowcount = -1
+ self._statusmessage: bytes | None = None
self._query: PostgresQuery | None
# None if executemany() not executing, True/False according to returning state
self._execmany_returning: bool | None = None
`!None` if the cursor doesn't have a result available.
"""
- msg = self.pgresult.command_status if self.pgresult else None
- return msg.decode() if msg else None
+ return self._statusmessage.decode() if self._statusmessage else None
def _make_row_maker(self) -> RowMaker[Row]:
raise NotImplementedError
nrows = self.pgresult.command_tuples
self._rowcount = nrows if nrows is not None else -1
+ self._statusmessage = res.command_status
self._make_row = self._make_row_maker()
def _set_results(self, results: list[PGresult]) -> None:
self._select_current_result(0)
else:
# In non-returning case, set rowcount to the cumulated number of
- # rows of executed queries.
- for res in results:
- self._rowcount += res.command_tuples or 0
+ # rows of executed queries. Keep the last result's command_status
+ # so that statusmessage is available after the batch completes.
+ if results:
+ self._statusmessage = results[-1].command_status
+ for res in results:
+ self._rowcount += res.command_tuples or 0
@classmethod
def _loaders_changed(
cur.execute("select generate_series(1, 10)")
assert cur.statusmessage == "SELECT 10"
+ cur.execute("")
+ assert cur.statusmessage is None
+
cur.execute("create table statusmessage ()")
assert cur.statusmessage == "CREATE TABLE"
assert cur.rowcount == 2
+@pytest.mark.parametrize("returning", [False, True])
+def test_executemany_statusmessage(conn, execmany, returning):
+ cur = conn.cursor()
+ cur.executemany(
+ ph(cur, "insert into execmany(num, data) values (%s, %s)"),
+ [(10, "hello"), (20, "world")],
+ returning=returning,
+ )
+ assert cur.rowcount == (1 if returning else 2)
+ assert cur.statusmessage is not None
+ assert cur.statusmessage.startswith("INSERT")
+
+ cur.executemany(
+ ph(cur, "insert into execmany(num, data) values (%s, %s)"),
+ [],
+ returning=returning,
+ )
+ assert cur.statusmessage is None
+
+
def test_executemany_returning(conn, execmany):
cur = conn.cursor()
cur.executemany(
await cur.execute("select generate_series(1, 10)")
assert cur.statusmessage == "SELECT 10"
+ await cur.execute("")
+ assert cur.statusmessage is None
+
await cur.execute("create table statusmessage ()")
assert cur.statusmessage == "CREATE TABLE"
assert cur.rowcount == 2
+@pytest.mark.parametrize("returning", [False, True])
+async def test_executemany_statusmessage(aconn, execmany, returning):
+ cur = aconn.cursor()
+ await cur.executemany(
+ ph(cur, "insert into execmany(num, data) values (%s, %s)"),
+ [(10, "hello"), (20, "world")],
+ returning=returning,
+ )
+ assert cur.rowcount == (1 if returning else 2)
+ assert cur.statusmessage is not None
+ assert cur.statusmessage.startswith("INSERT")
+
+ await cur.executemany(
+ ph(cur, "insert into execmany(num, data) values (%s, %s)"),
+ [],
+ returning=returning,
+ )
+ assert cur.statusmessage is None
+
+
async def test_executemany_returning(aconn, execmany):
cur = aconn.cursor()
await cur.executemany(