]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
tests: fix cursor tests to run on raw cursors too
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 14 Aug 2023 18:34:19 +0000 (19:34 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 15 Aug 2023 15:29:03 +0000 (16:29 +0100)
tests/fix_faker.py
tests/test_cursor.py
tests/test_cursor_async.py
tests/test_raw_cursor.py [new file with mode: 0644]
tests/test_raw_cursor_async.py [new file with mode: 0644]

index 41376a7fa9778684705dc6e6d8944c201a1e6adc..e302d22e5aed413ebe215a663dd3fdd648017dfb 100644 (file)
@@ -146,7 +146,7 @@ class Faker:
             with conn.transaction():
                 yield
         except psycopg.DatabaseError:
-            cur = conn.cursor()
+            cur = psycopg.Cursor(conn)
             # Repeat insert one field at time, until finding the wrong one
             cur.execute(self.drop_stmt)
             cur.execute(self.create_stmt)
@@ -171,7 +171,7 @@ class Faker:
             async with aconn.transaction():
                 yield
         except psycopg.DatabaseError:
-            acur = aconn.cursor()
+            acur = psycopg.AsyncCursor(aconn)
             # Repeat insert one field at time, until finding the wrong one
             await acur.execute(self.drop_stmt)
             await acur.execute(self.create_stmt)
index f2dcb6ea3a76071dca85ab6a208abba21a8354d7..3fb15fc250499758772f90fcd7909fc783de37cf 100644 (file)
@@ -2,10 +2,11 @@
 Tests common to psycopg.Cursor and its subclasses.
 """
 
+import re
 import pickle
 import weakref
 import datetime as dt
-from typing import List, Union
+from typing import Any, List, Match, Union
 from contextlib import closing
 
 import pytest
@@ -27,6 +28,25 @@ def conn(conn, request):
     return conn
 
 
+def ph(cur: Any, query: str) -> str:
+    """Change placeholders in a query from %s to $n if testing  a raw cursor"""
+    if not isinstance(cur, (psycopg.RawCursor, psycopg.AsyncRawCursor)):
+        return query
+
+    if "%(" in query:
+        raise pytest.skip("RawCursor only supports positional placeholders")
+
+    n = 1
+
+    def s(m: Match[str]) -> str:
+        nonlocal n
+        rv = f"${n}"
+        n += 1
+        return rv
+
+    return re.sub(r"(?<!%)(%[bst])", s, query)
+
+
 def test_init(conn):
     cur = conn.cursor_factory(conn)
     cur.execute("select 1")
@@ -168,7 +188,7 @@ def test_execute_many_results(conn):
 
 def test_execute_sequence(conn):
     cur = conn.cursor()
-    rv = cur.execute("select %s::int, %s::text, %s::text", [1, "foo", None])
+    rv = cur.execute(ph(cur, "select %s::int, %s::text, %s::text"), [1, "foo", None])
     assert rv is cur
     assert len(cur._results) == 1
     assert cur.pgresult.get_value(0, 0) == b"1"
@@ -189,8 +209,8 @@ def test_execute_empty_query(conn, query):
 def test_execute_type_change(conn):
     # issue #112
     conn.execute("create table bug_112 (num integer)")
-    sql = "insert into bug_112 (num) values (%s)"
     cur = conn.cursor()
+    sql = ph(cur, "insert into bug_112 (num) values (%s)")
     cur.execute(sql, (1,))
     cur.execute(sql, (100_000,))
     cur.execute("select num from bug_112 order by num")
@@ -199,8 +219,8 @@ def test_execute_type_change(conn):
 
 def test_executemany_type_change(conn):
     conn.execute("create table bug_112 (num integer)")
-    sql = "insert into bug_112 (num) values (%s)"
     cur = conn.cursor()
+    sql = ph(cur, "insert into bug_112 (num) values (%s)")
     cur.executemany(sql, [(1,), (100_000,)])
     cur.execute("select num from bug_112 order by num")
     assert cur.fetchall() == [(1,), (100_000,)]
@@ -218,7 +238,7 @@ def test_execute_copy(conn, query):
 
 def test_fetchone(conn):
     cur = conn.cursor()
-    cur.execute("select %s::int, %s::text, %s::text", [1, "foo", None])
+    cur.execute(ph(cur, "select %s::int, %s::text, %s::text"), [1, "foo", None])
     assert cur.pgresult.fformat(0) == 0
 
     row = cur.fetchone()
@@ -232,7 +252,7 @@ def test_binary_cursor_execute(conn):
         conn.cursor_factory is psycopg.ClientCursor, psycopg.NotSupportedError
     ) as ex:
         cur = conn.cursor(binary=True)
-        cur.execute("select %s, %s", [1, None])
+        cur.execute(ph(cur, "select %s, %s"), [1, None])
     if ex:
         return
 
@@ -246,7 +266,7 @@ def test_execute_binary(conn):
     with raiseif(
         conn.cursor_factory is psycopg.ClientCursor, psycopg.NotSupportedError
     ) as ex:
-        cur.execute("select %s, %s", [1, None], binary=True)
+        cur.execute(ph(cur, "select %s, %s"), [1, None], binary=True)
     if ex:
         return
 
@@ -257,7 +277,7 @@ def test_execute_binary(conn):
 
 def test_binary_cursor_text_override(conn):
     cur = conn.cursor(binary=True)
-    cur.execute("select %s, %s", [1, None], binary=False)
+    cur.execute(ph(cur, "select %s, %s"), [1, None], binary=False)
     assert cur.fetchone() == (1, None)
     assert cur.pgresult.fformat(0) == 0
     assert cur.pgresult.get_value(0, 0) == b"1"
@@ -299,7 +319,7 @@ def execmany(svcconn, _execmany):
 def test_executemany(conn, execmany):
     cur = conn.cursor()
     cur.executemany(
-        "insert into execmany(num, data) values (%s, %s)",
+        ph(cur, "insert into execmany(num, data) values (%s, %s)"),
         [(10, "hello"), (20, "world")],
     )
     cur.execute("select num, data from execmany order by 1")
@@ -309,7 +329,7 @@ def test_executemany(conn, execmany):
 def test_executemany_name(conn, execmany):
     cur = conn.cursor()
     cur.executemany(
-        "insert into execmany(num, data) values (%(num)s, %(data)s)",
+        ph(cur, "insert into execmany(num, data) values (%(num)s, %(data)s)"),
         [{"num": 11, "data": "hello", "x": 1}, {"num": 21, "data": "world"}],
     )
     cur.execute("select num, data from execmany order by 1")
@@ -318,14 +338,14 @@ def test_executemany_name(conn, execmany):
 
 def test_executemany_no_data(conn, execmany):
     cur = conn.cursor()
-    cur.executemany("insert into execmany(num, data) values (%s, %s)", [])
+    cur.executemany(ph(cur, "insert into execmany(num, data) values (%s, %s)"), [])
     assert cur.rowcount == 0
 
 
 def test_executemany_rowcount(conn, execmany):
     cur = conn.cursor()
     cur.executemany(
-        "insert into execmany(num, data) values (%s, %s)",
+        ph(cur, "insert into execmany(num, data) values (%s, %s)"),
         [(10, "hello"), (20, "world")],
     )
     assert cur.rowcount == 2
@@ -334,7 +354,7 @@ def test_executemany_rowcount(conn, execmany):
 def test_executemany_returning(conn, execmany):
     cur = conn.cursor()
     cur.executemany(
-        "insert into execmany(num, data) values (%s, %s) returning num",
+        ph(cur, "insert into execmany(num, data) values (%s, %s) returning num"),
         [(10, "hello"), (20, "world")],
         returning=True,
     )
@@ -349,7 +369,7 @@ def test_executemany_returning(conn, execmany):
 def test_executemany_returning_discard(conn, execmany):
     cur = conn.cursor()
     cur.executemany(
-        "insert into execmany(num, data) values (%s, %s) returning num",
+        ph(cur, "insert into execmany(num, data) values (%s, %s) returning num"),
         [(10, "hello"), (20, "world")],
     )
     assert cur.rowcount == 2
@@ -361,7 +381,7 @@ def test_executemany_returning_discard(conn, execmany):
 def test_executemany_no_result(conn, execmany):
     cur = conn.cursor()
     cur.executemany(
-        "insert into execmany(num, data) values (%s, %s)",
+        ph(cur, "insert into execmany(num, data) values (%s, %s)"),
         [(10, "hello"), (20, "world")],
         returning=True,
     )
@@ -379,11 +399,13 @@ def test_executemany_no_result(conn, execmany):
 
 def test_executemany_rowcount_no_hit(conn, execmany):
     cur = conn.cursor()
-    cur.executemany("delete from execmany where id = %s", [(-1,), (-2,)])
+    cur.executemany(ph(cur, "delete from execmany where id = %s"), [(-1,), (-2,)])
     assert cur.rowcount == 0
-    cur.executemany("delete from execmany where id = %s", [])
+    cur.executemany(ph(cur, "delete from execmany where id = %s"), [])
     assert cur.rowcount == 0
-    cur.executemany("delete from execmany where id = %s returning num", [(-1,), (-2,)])
+    cur.executemany(
+        ph(cur, "delete from execmany where id = %s returning num"), [(-1,), (-2,)]
+    )
     assert cur.rowcount == 0
 
 
@@ -398,7 +420,7 @@ def test_executemany_rowcount_no_hit(conn, execmany):
 def test_executemany_badquery(conn, query):
     cur = conn.cursor()
     with pytest.raises(psycopg.DatabaseError):
-        cur.executemany(query, [(10, "hello"), (20, "world")])
+        cur.executemany(ph(cur, query), [(10, "hello"), (20, "world")])
 
 
 @pytest.mark.parametrize("fmt_in", PyFormat)
@@ -406,7 +428,7 @@ def test_executemany_null_first(conn, fmt_in):
     cur = conn.cursor()
     cur.execute("create table testmany (a bigint, b bigint)")
     cur.executemany(
-        f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})",
+        ph(cur, f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})"),
         [[1, None], [3, 4]],
     )
     with pytest.raises((psycopg.DataError, psycopg.ProgrammingError)):
@@ -610,7 +632,7 @@ def test_scroll(conn):
 )
 def test_execute_params_named(conn, query, params, want):
     cur = conn.cursor()
-    cur.execute(query, params)
+    cur.execute(ph(cur, query), params)
     rec = cur.fetchone()
     assert rec == want
 
@@ -619,7 +641,7 @@ def test_stream(conn):
     cur = conn.cursor()
     recs = []
     for rec in cur.stream(
-        "select i, '2021-01-01'::date + i from generate_series(1, %s) as i",
+        ph(cur, "select i, '2021-01-01'::date + i from generate_series(1, %s) as i"),
         [2],
     ):
         recs.append(rec)
index 1a05d966a6e6b42621cbb7dd02335ce7e4f90962..df70f0130be36c72e656570244e3f6aabe3da074 100644 (file)
@@ -9,7 +9,7 @@ from psycopg.adapt import PyFormat
 from psycopg.types import TypeInfo
 
 from .utils import alist, gc_collect, raiseif
-from .test_cursor import my_row_factory
+from .test_cursor import my_row_factory, ph
 from .test_cursor import execmany, _execmany  # noqa: F401
 from .fix_crdb import crdb_encoding
 
@@ -165,7 +165,9 @@ async def test_execute_many_results(aconn):
 
 async def test_execute_sequence(aconn):
     cur = aconn.cursor()
-    rv = await cur.execute("select %s::int, %s::text, %s::text", [1, "foo", None])
+    rv = await cur.execute(
+        ph(cur, "select %s::int, %s::text, %s::text"), [1, "foo", None]
+    )
     assert rv is cur
     assert len(cur._results) == 1
     assert cur.pgresult.get_value(0, 0) == b"1"
@@ -186,8 +188,8 @@ async def test_execute_empty_query(aconn, query):
 async def test_execute_type_change(aconn):
     # issue #112
     await aconn.execute("create table bug_112 (num integer)")
-    sql = "insert into bug_112 (num) values (%s)"
     cur = aconn.cursor()
+    sql = ph(cur, "insert into bug_112 (num) values (%s)")
     await cur.execute(sql, (1,))
     await cur.execute(sql, (100_000,))
     await cur.execute("select num from bug_112 order by num")
@@ -196,8 +198,8 @@ async def test_execute_type_change(aconn):
 
 async def test_executemany_type_change(aconn):
     await aconn.execute("create table bug_112 (num integer)")
-    sql = "insert into bug_112 (num) values (%s)"
     cur = aconn.cursor()
+    sql = ph(cur, "insert into bug_112 (num) values (%s)")
     await cur.executemany(sql, [(1,), (100_000,)])
     await cur.execute("select num from bug_112 order by num")
     assert (await cur.fetchall()) == [(1,), (100_000,)]
@@ -215,7 +217,7 @@ async def test_execute_copy(aconn, query):
 
 async def test_fetchone(aconn):
     cur = aconn.cursor()
-    await cur.execute("select %s::int, %s::text, %s::text", [1, "foo", None])
+    await cur.execute(ph(cur, "select %s::int, %s::text, %s::text"), [1, "foo", None])
     assert cur.pgresult.fformat(0) == 0
 
     row = await cur.fetchone()
@@ -229,7 +231,7 @@ async def test_binary_cursor_execute(aconn):
         aconn.cursor_factory is psycopg.AsyncClientCursor, psycopg.NotSupportedError
     ) as ex:
         cur = aconn.cursor(binary=True)
-        await cur.execute("select %s, %s", [1, None])
+        await cur.execute(ph(cur, "select %s, %s"), [1, None])
     if ex:
         return
 
@@ -243,7 +245,7 @@ async def test_execute_binary(aconn):
     with raiseif(
         aconn.cursor_factory is psycopg.AsyncClientCursor, psycopg.NotSupportedError
     ) as ex:
-        await cur.execute("select %s, %s", [1, None], binary=True)
+        await cur.execute(ph(cur, "select %s, %s"), [1, None], binary=True)
     if ex:
         return
 
@@ -254,7 +256,7 @@ async def test_execute_binary(aconn):
 
 async def test_binary_cursor_text_override(aconn):
     cur = aconn.cursor(binary=True)
-    await cur.execute("select %s, %s", [1, None], binary=False)
+    await cur.execute(ph(cur, "select %s, %s"), [1, None], binary=False)
     assert (await cur.fetchone()) == (1, None)
     assert cur.pgresult.fformat(0) == 0
     assert cur.pgresult.get_value(0, 0) == b"1"
@@ -280,7 +282,7 @@ async def test_query_badenc(aconn, encoding):
 async def test_executemany(aconn, execmany):
     cur = aconn.cursor()
     await cur.executemany(
-        "insert into execmany(num, data) values (%s, %s)",
+        ph(cur, "insert into execmany(num, data) values (%s, %s)"),
         [(10, "hello"), (20, "world")],
     )
     await cur.execute("select num, data from execmany order by 1")
@@ -291,7 +293,7 @@ async def test_executemany(aconn, execmany):
 async def test_executemany_name(aconn, execmany):
     cur = aconn.cursor()
     await cur.executemany(
-        "insert into execmany(num, data) values (%(num)s, %(data)s)",
+        ph(cur, "insert into execmany(num, data) values (%(num)s, %(data)s)"),
         [{"num": 11, "data": "hello", "x": 1}, {"num": 21, "data": "world"}],
     )
     await cur.execute("select num, data from execmany order by 1")
@@ -301,14 +303,16 @@ async def test_executemany_name(aconn, execmany):
 
 async def test_executemany_no_data(aconn, execmany):
     cur = aconn.cursor()
-    await cur.executemany("insert into execmany(num, data) values (%s, %s)", [])
+    await cur.executemany(
+        ph(cur, "insert into execmany(num, data) values (%s, %s)"), []
+    )
     assert cur.rowcount == 0
 
 
 async def test_executemany_rowcount(aconn, execmany):
     cur = aconn.cursor()
     await cur.executemany(
-        "insert into execmany(num, data) values (%s, %s)",
+        ph(cur, "insert into execmany(num, data) values (%s, %s)"),
         [(10, "hello"), (20, "world")],
     )
     assert cur.rowcount == 2
@@ -317,7 +321,7 @@ async def test_executemany_rowcount(aconn, execmany):
 async def test_executemany_returning(aconn, execmany):
     cur = aconn.cursor()
     await cur.executemany(
-        "insert into execmany(num, data) values (%s, %s) returning num",
+        ph(cur, "insert into execmany(num, data) values (%s, %s) returning num"),
         [(10, "hello"), (20, "world")],
         returning=True,
     )
@@ -332,7 +336,7 @@ async def test_executemany_returning(aconn, execmany):
 async def test_executemany_returning_discard(aconn, execmany):
     cur = aconn.cursor()
     await cur.executemany(
-        "insert into execmany(num, data) values (%s, %s) returning num",
+        ph(cur, "insert into execmany(num, data) values (%s, %s) returning num"),
         [(10, "hello"), (20, "world")],
     )
     assert cur.rowcount == 2
@@ -344,7 +348,7 @@ async def test_executemany_returning_discard(aconn, execmany):
 async def test_executemany_no_result(aconn, execmany):
     cur = aconn.cursor()
     await cur.executemany(
-        "insert into execmany(num, data) values (%s, %s)",
+        ph(cur, "insert into execmany(num, data) values (%s, %s)"),
         [(10, "hello"), (20, "world")],
         returning=True,
     )
@@ -362,12 +366,12 @@ async def test_executemany_no_result(aconn, execmany):
 
 async def test_executemany_rowcount_no_hit(aconn, execmany):
     cur = aconn.cursor()
-    await cur.executemany("delete from execmany where id = %s", [(-1,), (-2,)])
+    await cur.executemany(ph(cur, "delete from execmany where id = %s"), [(-1,), (-2,)])
     assert cur.rowcount == 0
-    await cur.executemany("delete from execmany where id = %s", [])
+    await cur.executemany(ph(cur, "delete from execmany where id = %s"), [])
     assert cur.rowcount == 0
     await cur.executemany(
-        "delete from execmany where id = %s returning num", [(-1,), (-2,)]
+        ph(cur, "delete from execmany where id = %s returning num"), [(-1,), (-2,)]
     )
     assert cur.rowcount == 0
 
@@ -383,7 +387,7 @@ async def test_executemany_rowcount_no_hit(aconn, execmany):
 async def test_executemany_badquery(aconn, query):
     cur = aconn.cursor()
     with pytest.raises(psycopg.DatabaseError):
-        await cur.executemany(query, [(10, "hello"), (20, "world")])
+        await cur.executemany(ph(cur, query), [(10, "hello"), (20, "world")])
 
 
 @pytest.mark.parametrize("fmt_in", PyFormat)
@@ -391,12 +395,12 @@ async def test_executemany_null_first(aconn, fmt_in):
     cur = aconn.cursor()
     await cur.execute("create table testmany (a bigint, b bigint)")
     await cur.executemany(
-        f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})",
+        ph(cur, f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})"),
         [[1, None], [3, 4]],
     )
     with pytest.raises((psycopg.DataError, psycopg.ProgrammingError)):
         await cur.executemany(
-            f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})",
+            ph(cur, f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})"),
             [[1, ""], [3, 4]],
         )
 
@@ -599,7 +603,7 @@ async def test_scroll(aconn):
 )
 async def test_execute_params_named(aconn, query, params, want):
     cur = aconn.cursor()
-    await cur.execute(query, params)
+    await cur.execute(ph(cur, query), params)
     rec = await cur.fetchone()
     assert rec == want
 
@@ -608,7 +612,7 @@ async def test_stream(aconn):
     cur = aconn.cursor()
     recs = []
     async for rec in cur.stream(
-        "select i, '2021-01-01'::date + i from generate_series(1, %s) as i",
+        ph(cur, "select i, '2021-01-01'::date + i from generate_series(1, %s) as i"),
         [2],
     ):
         recs.append(rec)
diff --git a/tests/test_raw_cursor.py b/tests/test_raw_cursor.py
new file mode 100644 (file)
index 0000000..64169dd
--- /dev/null
@@ -0,0 +1,112 @@
+import pytest
+import psycopg
+from psycopg import pq, rows, errors as e
+from psycopg.adapt import PyFormat
+
+from .test_cursor import ph
+from .utils import gc_collect, gc_count
+
+
+@pytest.fixture
+def conn(conn):
+    conn.cursor_factory = psycopg.RawCursor
+    return conn
+
+
+def test_default_cursor(conn):
+    cur = conn.cursor()
+    assert type(cur) is psycopg.RawCursor
+
+
+def test_str(conn):
+    cur = conn.cursor()
+    assert "psycopg.RawCursor" in str(cur)
+
+
+def test_sequence_only(conn):
+    cur = conn.cursor()
+    cur.execute("select 1", ())
+    assert cur.fetchone() == (1,)
+
+    with pytest.raises(TypeError, match="sequence"):
+        cur.execute("select 1", {})
+
+
+def test_execute_many_results_param(conn):
+    cur = conn.cursor()
+    # Postgres raises SyntaxError, CRDB raises InvalidPreparedStatementDefinition
+    with pytest.raises((e.SyntaxError, e.InvalidPreparedStatementDefinition)):
+        cur.execute("select $1; select generate_series(1, $2)", ("foo", 3))
+
+
+def test_query_params_execute(conn):
+    cur = conn.cursor()
+    assert cur._query is None
+
+    cur.execute("select $1, $2::text", [1, None])
+    assert cur._query is not None
+    assert cur._query.query == b"select $1, $2::text"
+    assert cur._query.params == [b"\x00\x01", None]
+
+    cur.execute("select 1")
+    assert cur._query.query == b"select 1"
+    assert not cur._query.params
+
+    with pytest.raises(psycopg.DataError):
+        cur.execute("select $1::int", ["wat"])
+
+    assert cur._query.query == b"select $1::int"
+    assert cur._query.params == [b"wat"]
+
+
+def test_query_params_executemany(conn):
+    cur = conn.cursor()
+
+    cur.executemany("select $1, $2", [[1, 2], [3, 4]])
+    assert cur._query.query == b"select $1, $2"
+    assert cur._query.params == [b"\x00\x03", b"\x00\x04"]
+
+
+@pytest.mark.slow
+@pytest.mark.parametrize("fmt", PyFormat)
+@pytest.mark.parametrize("fmt_out", pq.Format)
+@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
+@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
+def test_leak(conn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory):
+    faker.format = fmt
+    faker.choose_schema(ncols=5)
+    faker.make_records(10)
+    row_factory = getattr(rows, row_factory)
+
+    def work():
+        with conn_cls.connect(dsn) as conn, conn.transaction(force_rollback=True):
+            with conn.cursor(binary=fmt_out, row_factory=row_factory) as cur:
+                cur.execute(faker.drop_stmt)
+                cur.execute(faker.create_stmt)
+                with faker.find_insert_problem(conn):
+                    cur.executemany(faker.insert_stmt, faker.records)
+                cur.execute(ph(cur, faker.select_stmt))
+
+                if fetch == "one":
+                    while True:
+                        tmp = cur.fetchone()
+                        if tmp is None:
+                            break
+                elif fetch == "many":
+                    while True:
+                        tmp = cur.fetchmany(3)
+                        if not tmp:
+                            break
+                elif fetch == "all":
+                    cur.fetchall()
+                elif fetch == "iter":
+                    for rec in cur:
+                        pass
+
+    n = []
+    gc_collect()
+    for i in range(3):
+        work()
+        gc_collect()
+        n.append(gc_count())
+    assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
diff --git a/tests/test_raw_cursor_async.py b/tests/test_raw_cursor_async.py
new file mode 100644 (file)
index 0000000..037ee7f
--- /dev/null
@@ -0,0 +1,113 @@
+import pytest
+import psycopg
+from psycopg import pq, rows, errors as e
+from psycopg.adapt import PyFormat
+
+from .test_cursor import ph
+from .utils import gc_collect, gc_count
+
+
+@pytest.fixture
+async def aconn(aconn, anyio_backend):
+    aconn.cursor_factory = psycopg.AsyncRawCursor
+    return aconn
+
+
+async def test_default_cursor(aconn):
+    cur = aconn.cursor()
+    assert type(cur) is psycopg.AsyncRawCursor
+
+
+async def test_str(aconn):
+    cur = aconn.cursor()
+    assert "psycopg.AsyncRawCursor" in str(cur)
+
+
+async def test_sequence_only(aconn):
+    cur = aconn.cursor()
+    await cur.execute("select 1", ())
+    assert await cur.fetchone() == (1,)
+
+    with pytest.raises(TypeError, match="sequence"):
+        await cur.execute("select 1", {})
+
+
+async def test_execute_many_results_param(aconn):
+    cur = aconn.cursor()
+    # Postgres raises SyntaxError, CRDB raises InvalidPreparedStatementDefinition
+    with pytest.raises((e.SyntaxError, e.InvalidPreparedStatementDefinition)):
+        await cur.execute("select $1; select generate_series(1, $2)", ("foo", 3))
+
+
+async def test_query_params_execute(aconn):
+    cur = aconn.cursor()
+    assert cur._query is None
+
+    await cur.execute("select $1, $2::text", [1, None])
+    assert cur._query is not None
+    assert cur._query.query == b"select $1, $2::text"
+    assert cur._query.params == [b"\x00\x01", None]
+
+    await cur.execute("select 1")
+    assert cur._query.query == b"select 1"
+    assert not cur._query.params
+
+    with pytest.raises(psycopg.DataError):
+        await cur.execute("select $1::int", ["wat"])
+
+    assert cur._query.query == b"select $1::int"
+    assert cur._query.params == [b"wat"]
+
+
+async def test_query_params_executemany(aconn):
+    cur = aconn.cursor()
+
+    await cur.executemany("select $1, $2", [[1, 2], [3, 4]])
+    assert cur._query.query == b"select $1, $2"
+    assert cur._query.params == [b"\x00\x03", b"\x00\x04"]
+
+
+@pytest.mark.slow
+@pytest.mark.parametrize("fmt", PyFormat)
+@pytest.mark.parametrize("fmt_out", pq.Format)
+@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
+@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
+async def test_leak(aconn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory):
+    faker.format = fmt
+    faker.choose_schema(ncols=5)
+    faker.make_records(10)
+    row_factory = getattr(rows, row_factory)
+
+    async def work():
+        async with await aconn_cls.connect(dsn) as aconn:
+            async with aconn.transaction(force_rollback=True):
+                async with aconn.cursor(binary=fmt_out, row_factory=row_factory) as cur:
+                    await cur.execute(faker.drop_stmt)
+                    await cur.execute(faker.create_stmt)
+                    async with faker.find_insert_problem_async(aconn):
+                        await cur.executemany(faker.insert_stmt, faker.records)
+                    await cur.execute(ph(cur, faker.select_stmt))
+
+                    if fetch == "one":
+                        while True:
+                            tmp = await cur.fetchone()
+                            if tmp is None:
+                                break
+                    elif fetch == "many":
+                        while True:
+                            tmp = await cur.fetchmany(3)
+                            if not tmp:
+                                break
+                    elif fetch == "all":
+                        await cur.fetchall()
+                    elif fetch == "iter":
+                        async for rec in cur:
+                            pass
+
+    n = []
+    gc_collect()
+    for i in range(3):
+        await work()
+        gc_collect()
+        n.append(gc_count())
+    assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"