class NamedCursorHelper(Generic[ConnectionType]):
__slots__ = ("name", "_wcur")
+ """Helper object for common NamedCursor code.
+
+ TODO: this should be a mixin, but couldn't find a way to work it
+ correctly with the generic.
+ """
def __init__(
self,
results = yield from execute(cur._conn.pgconn)
cur._execute_results(results)
+ def _close_gen(self) -> PQGen[None]:
+ cur = self._cur
+ query = sql.SQL("close {}").format(sql.Identifier(self.name))
+ yield from cur._conn._exec_command(query)
+
def _make_declare_statement(
self, query: Query, scrollable: bool, hold: bool
) -> sql.Composable:
"""
Close the current cursor and free associated resources.
"""
- # TODO close the cursor for real
+ with self._conn.lock:
+ self._conn.wait(self._helper._close_gen())
self._close()
def execute(
"""
Close the current cursor and free associated resources.
"""
- # TODO close the cursor for real
+ async with self._conn.lock:
+ await self._conn.wait(self._helper._close_gen())
self._close()
async def execute(
assert cur.description[0].name == "bar"
assert cur.description[0].type_code == cur.adapters.types["int4"].oid
assert cur.pgresult.ntuples == 0
+
+
+def test_close(conn, recwarn):
+ cur = conn.cursor("foo")
+ cur.execute("select generate_series(1, 10) as bar")
+ cur.close()
+ assert cur.closed
+
+ assert not conn.execute(
+ "select * from pg_cursors where name = 'foo'"
+ ).fetchone()
+ del cur
+ assert not recwarn
+
+
+def test_context(conn, recwarn):
+ with conn.cursor("foo") as cur:
+ cur.execute("select generate_series(1, 10) as bar")
+
+ assert cur.closed
+ assert not conn.execute(
+ "select * from pg_cursors where name = 'foo'"
+ ).fetchone()
+ del cur
+ assert not recwarn
+
+
+def test_warn_close(conn, recwarn):
+ cur = conn.cursor("foo")
+ cur.execute("select generate_series(1, 10) as bar")
+ del cur
+ assert ".close()" in str(recwarn.pop(ResourceWarning).message)
assert cur.description[0].name == "bar"
assert cur.description[0].type_code == cur.adapters.types["int4"].oid
assert cur.pgresult.ntuples == 0
+
+
+async def test_close(aconn, recwarn):
+ cur = await aconn.cursor("foo")
+ await cur.execute("select generate_series(1, 10) as bar")
+ await cur.close()
+ assert cur.closed
+
+ assert not await (
+ await aconn.execute("select * from pg_cursors where name = 'foo'")
+ ).fetchone()
+ del cur
+ assert not recwarn
+
+
+async def test_context(aconn, recwarn):
+ async with await aconn.cursor("foo") as cur:
+ await cur.execute("select generate_series(1, 10) as bar")
+
+ assert cur.closed
+ assert not await (
+ await aconn.execute("select * from pg_cursors where name = 'foo'")
+ ).fetchone()
+ del cur
+ assert not recwarn
+
+
+async def test_warn_close(aconn, recwarn):
+ cur = await aconn.cursor("foo")
+ await cur.execute("select generate_series(1, 10) as bar")
+ del cur
+ assert ".close()" in str(recwarn.pop(ResourceWarning).message)