self._last_query: Optional[Query] = None
self._reset()
- def _reset(self) -> None:
+ def _reset(self, reset_query: bool = True) -> None:
self._results: List["PGresult"] = []
self.pgresult: Optional["PGresult"] = None
self._pos = 0
self._iresult = 0
self._rowcount = -1
- self._query: Optional[PostgresQuery] = None
+ self._query: Optional[PostgresQuery]
+ if reset_query:
+ self._query = None
def __repr__(self) -> str:
cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
self._pos = newpos
def _close(self) -> None:
+ """Non-blocking part of closing. Common to sync/async."""
+ # Don't reset the query because it may be useful to investigate after
+ # an error.
+ self._reset(reset_query=False)
self._closed = True
assert w() is None
+def test_pgresult(conn):
+ cur = conn.cursor()
+ cur.execute("select 1")
+ assert cur.pgresult
+ cur.close()
+ assert not cur.pgresult
+
+
def test_statusmessage(conn):
cur = conn.cursor()
assert cur.statusmessage is None
)
assert cur.rowcount == 42
- cur.close()
- assert cur.rowcount == 42
-
def test_rownumber(conn):
cur = conn.cursor()
assert w() is None
+async def test_pgresult(aconn):
+ cur = aconn.cursor()
+ await cur.execute("select 1")
+ assert cur.pgresult
+ await cur.close()
+ assert not cur.pgresult
+
+
async def test_statusmessage(aconn):
cur = aconn.cursor()
assert cur.statusmessage is None
)
assert cur.rowcount == 42
- await cur.close()
- assert cur.rowcount == 42
-
async def test_rownumber(aconn):
cur = aconn.cursor()
cur.close()
+def test_pgresult(conn):
+ cur = conn.cursor()
+ cur.execute("select 1")
+ assert cur.pgresult
+ cur.close()
+ assert not cur.pgresult
+
+
def test_context(conn, recwarn, retries):
for retry in retries:
with retry:
await cur.close()
+async def test_pgresult(aconn):
+ cur = aconn.cursor()
+ await cur.execute("select 1")
+ assert cur.pgresult
+ await cur.close()
+ assert not cur.pgresult
+
+
async def test_context(aconn, recwarn, retries):
async for retry in retries:
with retry: