]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
test: unify server and client async cursor tests
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 5 Aug 2023 18:38:31 +0000 (19:38 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Thu, 10 Aug 2023 00:43:40 +0000 (01:43 +0100)
Also clean up differences between sync and async cursor tests.

tests/test_client_cursor.py
tests/test_client_cursor_async.py
tests/test_cursor.py
tests/test_cursor_async.py
tests/test_default_cursor.py
tests/test_default_cursor_async.py [new file with mode: 0644]

index 066039e999b3c5f0ff4cff2870721d899582afa3..d8b4e5abf72cd760c060e7068b9c908d28c60c6c 100644 (file)
@@ -84,7 +84,6 @@ def test_leak(conn_cls, dsn, faker, fetch, row_factory):
                 cur.execute(faker.create_stmt)
                 with faker.find_insert_problem(conn):
                     cur.executemany(faker.insert_stmt, faker.records)
-
                 cur.execute(faker.select_stmt)
 
                 if fetch == "one":
@@ -109,6 +108,7 @@ def test_leak(conn_cls, dsn, faker, fetch, row_factory):
         work()
         gc_collect()
         n.append(gc_count())
+
     assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
 
 
index 67b314acb3a78ccee7a56058505a082941ecf9f4..43505cb4c15bc0aaff75a90fb68ef45024a3c879 100644 (file)
@@ -1,43 +1,22 @@
-import pytest
-import weakref
 import datetime as dt
-from typing import List
 
+import pytest
 import psycopg
-from psycopg import sql, rows
-from psycopg.adapt import PyFormat
-from psycopg.types import TypeInfo
+from psycopg import rows
 
-from .utils import alist, gc_collect, gc_count
-from .test_cursor import my_row_factory
-from .test_cursor import execmany, _execmany  # noqa: F401
+from .utils import gc_collect, gc_count
 from .fix_crdb import crdb_encoding
 
-execmany = execmany  # avoid F811 underneath
-pytestmark = pytest.mark.anyio
-
 
 @pytest.fixture
-async def aconn(aconn):
+async def aconn(aconn, anyio_backend):
     aconn.cursor_factory = psycopg.AsyncClientCursor
     return aconn
 
 
-async def test_init(aconn):
-    cur = psycopg.AsyncClientCursor(aconn)
-    await cur.execute("select 1")
-    assert (await cur.fetchone()) == (1,)
-
-    aconn.row_factory = rows.dict_row
-    cur = psycopg.AsyncClientCursor(aconn)
-    await cur.execute("select 1 as a")
-    assert (await cur.fetchone()) == {"a": 1}
-
-
-async def test_init_factory(aconn):
-    cur = psycopg.AsyncClientCursor(aconn, row_factory=rows.dict_row)
-    await cur.execute("select 1 as a")
-    assert (await cur.fetchone()) == {"a": 1}
+async def test_default_cursor(aconn):
+    cur = aconn.cursor()
+    assert type(cur) is psycopg.AsyncClientCursor
 
 
 async def test_from_cursor_factory(aconn_cls, dsn):
@@ -47,510 +26,23 @@ async def test_from_cursor_factory(aconn_cls, dsn):
         cur = aconn.cursor()
         assert type(cur) is psycopg.AsyncClientCursor
 
-        await cur.execute("select %s", (1,))
-        assert await cur.fetchone() == (1,)
-        assert cur._query
-        assert cur._query.query == b"select 1"
-
-
-async def test_close(aconn):
-    cur = aconn.cursor()
-    assert not cur.closed
-    await cur.close()
-    assert cur.closed
-
-    with pytest.raises(psycopg.InterfaceError):
-        await cur.execute("select 'foo'")
-
-    await cur.close()
-    assert cur.closed
-
-
-async def test_cursor_close_fetchone(aconn):
-    cur = aconn.cursor()
-    assert not cur.closed
-
-    query = "select * from generate_series(1, 10)"
-    await cur.execute(query)
-    for _ in range(5):
-        await cur.fetchone()
-
-    await cur.close()
-    assert cur.closed
-
-    with pytest.raises(psycopg.InterfaceError):
-        await cur.fetchone()
-
-
-async def test_cursor_close_fetchmany(aconn):
-    cur = aconn.cursor()
-    assert not cur.closed
-
-    query = "select * from generate_series(1, 10)"
-    await cur.execute(query)
-    assert len(await cur.fetchmany(2)) == 2
-
-    await cur.close()
-    assert cur.closed
-
-    with pytest.raises(psycopg.InterfaceError):
-        await cur.fetchmany(2)
-
-
-async def test_cursor_close_fetchall(aconn):
-    cur = aconn.cursor()
-    assert not cur.closed
-
-    query = "select * from generate_series(1, 10)"
-    await cur.execute(query)
-    assert len(await cur.fetchall()) == 10
-
-    await cur.close()
-    assert cur.closed
-
-    with pytest.raises(psycopg.InterfaceError):
-        await cur.fetchall()
-
-
-async def test_context(aconn):
-    async with aconn.cursor() as cur:
-        assert not cur.closed
-
-    assert cur.closed
-
-
-@pytest.mark.slow
-async def test_weakref(aconn):
-    cur = aconn.cursor()
-    w = weakref.ref(cur)
-    await cur.close()
-    del cur
-    gc_collect()
-    assert w() is None
-
-
-async def test_pgresult(aconn):
-    cur = aconn.cursor()
-    await cur.execute("select 1")
-    assert cur.pgresult
-    await cur.close()
-    assert not cur.pgresult
-
 
-async def test_statusmessage(aconn):
-    cur = aconn.cursor()
-    assert cur.statusmessage is None
-
-    await cur.execute("select generate_series(1, 10)")
-    assert cur.statusmessage == "SELECT 10"
-
-    await cur.execute("create table statusmessage ()")
-    assert cur.statusmessage == "CREATE TABLE"
-
-    with pytest.raises(psycopg.ProgrammingError):
-        await cur.execute("wat")
-    assert cur.statusmessage is None
-
-
-async def test_execute_sql(aconn):
-    cur = aconn.cursor()
-    await cur.execute(sql.SQL("select {value}").format(value="hello"))
-    assert await cur.fetchone() == ("hello",)
-
-
-async def test_execute_many_results(aconn):
+async def test_execute_many_results_param(aconn):
     cur = aconn.cursor()
     assert cur.nextset() is None
 
-    rv = await cur.execute("select %s; select generate_series(1,%s)", ("foo", 3))
+    rv = await cur.execute("select %s; select generate_series(1, %s)", ("foo", 3))
     assert rv is cur
     assert (await cur.fetchall()) == [("foo",)]
     assert cur.rowcount == 1
     assert cur.nextset()
     assert (await cur.fetchall()) == [(1,), (2,), (3,)]
-    assert cur.rowcount == 3
     assert cur.nextset() is None
 
     await cur.close()
     assert cur.nextset() is None
 
 
-async def test_execute_sequence(aconn):
-    cur = aconn.cursor()
-    rv = await cur.execute("select %s::int, %s::text, %s::text", [1, "foo", None])
-    assert rv is cur
-    assert len(cur._results) == 1
-    assert cur.pgresult.get_value(0, 0) == b"1"
-    assert cur.pgresult.get_value(0, 1) == b"foo"
-    assert cur.pgresult.get_value(0, 2) is None
-    assert cur.nextset() is None
-
-
-@pytest.mark.parametrize("query", ["", " ", ";"])
-async def test_execute_empty_query(aconn, query):
-    cur = aconn.cursor()
-    await cur.execute(query)
-    assert cur.pgresult.status == cur.ExecStatus.EMPTY_QUERY
-    with pytest.raises(psycopg.ProgrammingError):
-        await cur.fetchone()
-
-
-async def test_execute_type_change(aconn):
-    # issue #112
-    await aconn.execute("create table bug_112 (num integer)")
-    sql = "insert into bug_112 (num) values (%s)"
-    cur = aconn.cursor()
-    await cur.execute(sql, (1,))
-    await cur.execute(sql, (100_000,))
-    await cur.execute("select num from bug_112 order by num")
-    assert (await cur.fetchall()) == [(1,), (100_000,)]
-
-
-async def test_executemany_type_change(aconn):
-    await aconn.execute("create table bug_112 (num integer)")
-    sql = "insert into bug_112 (num) values (%s)"
-    cur = aconn.cursor()
-    await cur.executemany(sql, [(1,), (100_000,)])
-    await cur.execute("select num from bug_112 order by num")
-    assert (await cur.fetchall()) == [(1,), (100_000,)]
-
-
-@pytest.mark.parametrize(
-    "query", ["copy testcopy from stdin", "copy testcopy to stdout"]
-)
-async def test_execute_copy(aconn, query):
-    cur = aconn.cursor()
-    await cur.execute("create table testcopy (id int)")
-    with pytest.raises(psycopg.ProgrammingError):
-        await cur.execute(query)
-
-
-async def test_fetchone(aconn):
-    cur = aconn.cursor()
-    await cur.execute("select %s::int, %s::text, %s::text", [1, "foo", None])
-    assert cur.pgresult.fformat(0) == 0
-
-    row = await cur.fetchone()
-    assert row == (1, "foo", None)
-    row = await cur.fetchone()
-    assert row is None
-
-
-async def test_binary_cursor_execute(aconn):
-    with pytest.raises(psycopg.NotSupportedError):
-        cur = aconn.cursor(binary=True)
-        await cur.execute("select %s, %s", [1, None])
-
-
-async def test_execute_binary(aconn):
-    with pytest.raises(psycopg.NotSupportedError):
-        cur = aconn.cursor()
-        await cur.execute("select %s, %s", [1, None], binary=True)
-
-
-async def test_binary_cursor_text_override(aconn):
-    cur = aconn.cursor(binary=True)
-    await cur.execute("select %s, %s", [1, None], binary=False)
-    assert (await cur.fetchone()) == (1, None)
-    assert cur.pgresult.fformat(0) == 0
-    assert cur.pgresult.get_value(0, 0) == b"1"
-
-
-@pytest.mark.parametrize("encoding", ["utf8", crdb_encoding("latin9")])
-async def test_query_encode(aconn, encoding):
-    await aconn.execute(f"set client_encoding to {encoding}")
-    cur = aconn.cursor()
-    await cur.execute("select '\u20ac'")
-    (res,) = await cur.fetchone()
-    assert res == "\u20ac"
-
-
-@pytest.mark.parametrize("encoding", [crdb_encoding("latin1")])
-async def test_query_badenc(aconn, encoding):
-    await aconn.execute(f"set client_encoding to {encoding}")
-    cur = aconn.cursor()
-    with pytest.raises(UnicodeEncodeError):
-        await cur.execute("select '\u20ac'")
-
-
-async def test_executemany(aconn, execmany):
-    cur = aconn.cursor()
-    await cur.executemany(
-        "insert into execmany(num, data) values (%s, %s)",
-        [(10, "hello"), (20, "world")],
-    )
-    await cur.execute("select num, data from execmany order by 1")
-    rv = await cur.fetchall()
-    assert rv == [(10, "hello"), (20, "world")]
-
-
-async def test_executemany_name(aconn, execmany):
-    cur = aconn.cursor()
-    await cur.executemany(
-        "insert into execmany(num, data) values (%(num)s, %(data)s)",
-        [{"num": 11, "data": "hello", "x": 1}, {"num": 21, "data": "world"}],
-    )
-    await cur.execute("select num, data from execmany order by 1")
-    rv = await cur.fetchall()
-    assert rv == [(11, "hello"), (21, "world")]
-
-
-async def test_executemany_no_data(aconn, execmany):
-    cur = aconn.cursor()
-    await cur.executemany("insert into execmany(num, data) values (%s, %s)", [])
-    assert cur.rowcount == 0
-
-
-async def test_executemany_rowcount(aconn, execmany):
-    cur = aconn.cursor()
-    await cur.executemany(
-        "insert into execmany(num, data) values (%s, %s)",
-        [(10, "hello"), (20, "world")],
-    )
-    assert cur.rowcount == 2
-
-
-async def test_executemany_returning(aconn, execmany):
-    cur = aconn.cursor()
-    await cur.executemany(
-        "insert into execmany(num, data) values (%s, %s) returning num",
-        [(10, "hello"), (20, "world")],
-        returning=True,
-    )
-    assert cur.rowcount == 1
-    assert (await cur.fetchone()) == (10,)
-    assert cur.nextset()
-    assert cur.rowcount == 1
-    assert (await cur.fetchone()) == (20,)
-    assert cur.nextset() is None
-
-
-async def test_executemany_returning_discard(aconn, execmany):
-    cur = aconn.cursor()
-    await cur.executemany(
-        "insert into execmany(num, data) values (%s, %s) returning num",
-        [(10, "hello"), (20, "world")],
-    )
-    assert cur.rowcount == 2
-    with pytest.raises(psycopg.ProgrammingError):
-        await cur.fetchone()
-    assert cur.nextset() is None
-
-
-async def test_executemany_no_result(aconn, execmany):
-    cur = aconn.cursor()
-    await cur.executemany(
-        "insert into execmany(num, data) values (%s, %s)",
-        [(10, "hello"), (20, "world")],
-        returning=True,
-    )
-    assert cur.rowcount == 1
-    assert cur.statusmessage.startswith("INSERT")
-    with pytest.raises(psycopg.ProgrammingError):
-        await cur.fetchone()
-    pgresult = cur.pgresult
-    assert cur.nextset()
-    assert cur.rowcount == 1
-    assert cur.statusmessage.startswith("INSERT")
-    assert pgresult is not cur.pgresult
-    assert cur.nextset() is None
-
-
-async def test_executemany_rowcount_no_hit(aconn, execmany):
-    cur = aconn.cursor()
-    await cur.executemany("delete from execmany where id = %s", [(-1,), (-2,)])
-    assert cur.rowcount == 0
-    await cur.executemany("delete from execmany where id = %s", [])
-    assert cur.rowcount == 0
-    await cur.executemany(
-        "delete from execmany where id = %s returning num", [(-1,), (-2,)]
-    )
-    assert cur.rowcount == 0
-
-
-@pytest.mark.parametrize(
-    "query",
-    [
-        "insert into nosuchtable values (%s, %s)",
-        "copy (select %s, %s) to stdout",
-        "wat (%s, %s)",
-    ],
-)
-async def test_executemany_badquery(aconn, query):
-    cur = aconn.cursor()
-    with pytest.raises(psycopg.DatabaseError):
-        await cur.executemany(query, [(10, "hello"), (20, "world")])
-
-
-@pytest.mark.parametrize("fmt_in", PyFormat)
-async def test_executemany_null_first(aconn, fmt_in):
-    cur = aconn.cursor()
-    await cur.execute("create table testmany (a bigint, b bigint)")
-    await cur.executemany(
-        f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})",
-        [[1, None], [3, 4]],
-    )
-    with pytest.raises((psycopg.DataError, psycopg.ProgrammingError)):
-        await cur.executemany(
-            f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})",
-            [[1, ""], [3, 4]],
-        )
-
-
-async def test_rowcount(aconn):
-    cur = aconn.cursor()
-
-    await cur.execute("select 1 from generate_series(1, 0)")
-    assert cur.rowcount == 0
-
-    await cur.execute("select 1 from generate_series(1, 42)")
-    assert cur.rowcount == 42
-
-    await cur.execute("create table test_rowcount_notuples (id int primary key)")
-    assert cur.rowcount == -1
-
-    await cur.execute(
-        "insert into test_rowcount_notuples select generate_series(1, 42)"
-    )
-    assert cur.rowcount == 42
-
-
-async def test_rownumber(aconn):
-    cur = aconn.cursor()
-    assert cur.rownumber is None
-
-    await cur.execute("select 1 from generate_series(1, 42)")
-    assert cur.rownumber == 0
-
-    await cur.fetchone()
-    assert cur.rownumber == 1
-    await cur.fetchone()
-    assert cur.rownumber == 2
-    await cur.fetchmany(10)
-    assert cur.rownumber == 12
-    rns: List[int] = []
-    async for i in cur:
-        assert cur.rownumber
-        rns.append(cur.rownumber)
-        if len(rns) >= 3:
-            break
-    assert rns == [13, 14, 15]
-    assert len(await cur.fetchall()) == 42 - rns[-1]
-    assert cur.rownumber == 42
-
-
-async def test_iter(aconn):
-    cur = aconn.cursor()
-    await cur.execute("select generate_series(1, 3)")
-    res = []
-    async for rec in cur:
-        res.append(rec)
-    assert res == [(1,), (2,), (3,)]
-
-
-async def test_iter_stop(aconn):
-    cur = aconn.cursor()
-    await cur.execute("select generate_series(1, 3)")
-    async for rec in cur:
-        assert rec == (1,)
-        break
-
-    async for rec in cur:
-        assert rec == (2,)
-        break
-
-    assert (await cur.fetchone()) == (3,)
-    async for rec in cur:
-        assert False
-
-
-async def test_row_factory(aconn):
-    cur = aconn.cursor(row_factory=my_row_factory)
-    await cur.execute("select 'foo' as bar")
-    (r,) = await cur.fetchone()
-    assert r == "FOObar"
-
-    await cur.execute("select 'x' as x; select 'y' as y, 'z' as z")
-    assert await cur.fetchall() == [["Xx"]]
-    assert cur.nextset()
-    assert await cur.fetchall() == [["Yy", "Zz"]]
-
-    await cur.scroll(-1)
-    cur.row_factory = rows.dict_row
-    assert await cur.fetchone() == {"y": "y", "z": "z"}
-
-
-async def test_row_factory_none(aconn):
-    cur = aconn.cursor(row_factory=None)
-    assert cur.row_factory is rows.tuple_row
-    await cur.execute("select 1 as a, 2 as b")
-    r = await cur.fetchone()
-    assert type(r) is tuple
-    assert r == (1, 2)
-
-
-async def test_bad_row_factory(aconn):
-    def broken_factory(cur):
-        1 / 0
-
-    cur = aconn.cursor(row_factory=broken_factory)
-    with pytest.raises(ZeroDivisionError):
-        await cur.execute("select 1")
-
-    def broken_maker(cur):
-        def make_row(seq):
-            1 / 0
-
-        return make_row
-
-    cur = aconn.cursor(row_factory=broken_maker)
-    await cur.execute("select 1")
-    with pytest.raises(ZeroDivisionError):
-        await cur.fetchone()
-
-
-async def test_scroll(aconn):
-    cur = aconn.cursor()
-    with pytest.raises(psycopg.ProgrammingError):
-        await cur.scroll(0)
-
-    await cur.execute("select generate_series(0,9)")
-    await cur.scroll(2)
-    assert await cur.fetchone() == (2,)
-    await cur.scroll(2)
-    assert await cur.fetchone() == (5,)
-    await cur.scroll(2, mode="relative")
-    assert await cur.fetchone() == (8,)
-    await cur.scroll(-1)
-    assert await cur.fetchone() == (8,)
-    await cur.scroll(-2)
-    assert await cur.fetchone() == (7,)
-    await cur.scroll(2, mode="absolute")
-    assert await cur.fetchone() == (2,)
-
-    # on the boundary
-    await cur.scroll(0, mode="absolute")
-    assert await cur.fetchone() == (0,)
-    with pytest.raises(IndexError):
-        await cur.scroll(-1, mode="absolute")
-
-    await cur.scroll(0, mode="absolute")
-    with pytest.raises(IndexError):
-        await cur.scroll(-1)
-
-    await cur.scroll(9, mode="absolute")
-    assert await cur.fetchone() == (9,)
-    with pytest.raises(IndexError):
-        await cur.scroll(10, mode="absolute")
-
-    await cur.scroll(9, mode="absolute")
-    with pytest.raises(IndexError):
-        await cur.scroll(1)
-
-    with pytest.raises(ValueError):
-        await cur.scroll(1, "wat")
-
-
 async def test_query_params_execute(aconn):
     cur = aconn.cursor()
     assert cur._query is None
@@ -571,21 +63,6 @@ async def test_query_params_execute(aconn):
     assert cur._query.params == (b"'wat'",)
 
 
-@pytest.mark.parametrize(
-    "query, params, want",
-    [
-        ("select %(x)s", {"x": 1}, (1,)),
-        ("select %(x)s, %(y)s", {"x": 1, "y": 2}, (1, 2)),
-        ("select %(x)s, %(x)s", {"x": 1}, (1, 1)),
-    ],
-)
-async def test_query_params_named(aconn, query, params, want):
-    cur = aconn.cursor()
-    await cur.execute(query, params)
-    rec = await cur.fetchone()
-    assert rec == want
-
-
 async def test_query_params_executemany(aconn):
     cur = aconn.cursor()
 
@@ -594,47 +71,6 @@ async def test_query_params_executemany(aconn):
     assert cur._query.params == (b"3", b"4")
 
 
-@pytest.mark.crdb_skip("copy")
-@pytest.mark.parametrize("ph, params", [("%s", (10,)), ("%(n)s", {"n": 10})])
-async def test_copy_out_param(aconn, ph, params):
-    cur = aconn.cursor()
-    async with cur.copy(
-        f"copy (select * from generate_series(1, {ph})) to stdout", params
-    ) as copy:
-        copy.set_types(["int4"])
-        assert await alist(copy.rows()) == [(i + 1,) for i in range(10)]
-
-    assert aconn.info.transaction_status == aconn.TransactionStatus.INTRANS
-
-
-async def test_stream(aconn):
-    cur = aconn.cursor()
-    recs = []
-    async for rec in cur.stream(
-        "select i, '2021-01-01'::date + i from generate_series(1, %s) as i",
-        [2],
-    ):
-        recs.append(rec)
-
-    assert recs == [(1, dt.date(2021, 1, 2)), (2, dt.date(2021, 1, 3))]
-
-
-async def test_str(aconn):
-    cur = aconn.cursor()
-    assert "psycopg.AsyncClientCursor" in str(cur)
-    assert "[IDLE]" in str(cur)
-    assert "[closed]" not in str(cur)
-    assert "[no result]" in str(cur)
-    await cur.execute("select 1")
-    assert "[INTRANS]" in str(cur)
-    assert "[TUPLES_OK]" in str(cur)
-    assert "[closed]" not in str(cur)
-    assert "[no result]" not in str(cur)
-    await cur.close()
-    assert "[closed]" in str(cur)
-    assert "[INTRANS]" in str(cur)
-
-
 @pytest.mark.slow
 @pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
 @pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
@@ -710,22 +146,3 @@ async def test_mogrify_badenc(aconn, encoding):
     await aconn.execute(f"set client_encoding to {encoding}")
     with pytest.raises(UnicodeEncodeError):
         aconn.cursor().mogrify("select %(s)s", {"s": "\u20ac"})
-
-
-@pytest.mark.pipeline
-async def test_message_0x33(aconn):
-    # https://github.com/psycopg/psycopg/issues/314
-    notices = []
-    aconn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
-
-    await aconn.set_autocommit(True)
-    async with aconn.pipeline():
-        cur = await aconn.execute("select 'test'")
-        assert (await cur.fetchone()) == ("test",)
-
-    assert not notices
-
-
-async def test_typeinfo(aconn):
-    info = await TypeInfo.fetch(aconn, "jsonb")
-    assert info is not None
index e7156f082efb6b62b608afc2510dd20f6addf1de..b7d3c051a678fa870919d2b26cd4ccc40245c5ec 100644 (file)
@@ -159,6 +159,7 @@ def test_execute_many_results(conn):
     assert cur.rowcount == 1
     assert cur.nextset()
     assert cur.fetchall() == [(1,), (2,), (3,)]
+    assert cur.rowcount == 3
     assert cur.nextset() is None
 
     cur.close()
@@ -232,7 +233,6 @@ def test_binary_cursor_execute(conn):
     ) as ex:
         cur = conn.cursor(binary=True)
         cur.execute("select %s, %s", [1, None])
-
     if ex:
         return
 
@@ -715,7 +715,9 @@ def test_stream_error_python_consumed(conn):
     assert conn.info.transaction_status == conn.TransactionStatus.INTRANS
 
 
-def test_stream_close(conn):
+@pytest.mark.parametrize("autocommit", [False, True])
+def test_stream_close(conn, autocommit):
+    conn.autocommit = autocommit
     cur = conn.cursor()
     with pytest.raises(psycopg.OperationalError):
         for rec in cur.stream("select generate_series(1, 3)"):
index e2081535523655f759030d6cef479548feca1195..d85a929d949ce5b1fe271a42d52e1d7ea6474528 100644 (file)
@@ -4,10 +4,11 @@ import datetime as dt
 from typing import List
 
 import psycopg
-from psycopg import pq, sql, rows
+from psycopg import sql, rows
 from psycopg.adapt import PyFormat
+from psycopg.types import TypeInfo
 
-from .utils import gc_collect, gc_count
+from .utils import alist, gc_collect, raiseif
 from .test_cursor import my_row_factory
 from .test_cursor import execmany, _execmany  # noqa: F401
 from .fix_crdb import crdb_encoding
@@ -15,19 +16,25 @@ from .fix_crdb import crdb_encoding
 execmany = execmany  # avoid F811 underneath
 
 
+@pytest.fixture(params=[psycopg.AsyncCursor, psycopg.AsyncClientCursor])
+def aconn(aconn, request, anyio_backend):
+    aconn.cursor_factory = request.param
+    return aconn
+
+
 async def test_init(aconn):
-    cur = psycopg.AsyncCursor(aconn)
+    cur = aconn.cursor_factory(aconn)
     await cur.execute("select 1")
     assert (await cur.fetchone()) == (1,)
 
     aconn.row_factory = rows.dict_row
-    cur = psycopg.AsyncCursor(aconn)
+    cur = aconn.cursor_factory(aconn)
     await cur.execute("select 1 as a")
     assert (await cur.fetchone()) == {"a": 1}
 
 
 async def test_init_factory(aconn):
-    cur = psycopg.AsyncCursor(aconn, row_factory=rows.dict_row)
+    cur = aconn.cursor_factory(aconn, row_factory=rows.dict_row)
     await cur.execute("select 1 as a")
     assert (await cur.fetchone()) == {"a": 1}
 
@@ -131,6 +138,12 @@ async def test_statusmessage(aconn):
     assert cur.statusmessage is None
 
 
+async def test_execute_sql(aconn):
+    cur = aconn.cursor()
+    await cur.execute(sql.SQL("select {value}").format(value="hello"))
+    assert (await cur.fetchone()) == ("hello",)
+
+
 async def test_execute_many_results(aconn):
     cur = aconn.cursor()
     assert cur.nextset() is None
@@ -210,8 +223,14 @@ async def test_fetchone(aconn):
 
 
 async def test_binary_cursor_execute(aconn):
-    cur = aconn.cursor(binary=True)
-    await cur.execute("select %s, %s", [1, None])
+    with raiseif(
+        aconn.cursor_factory is psycopg.AsyncClientCursor, psycopg.NotSupportedError
+    ) as ex:
+        cur = aconn.cursor(binary=True)
+        await cur.execute("select %s, %s", [1, None])
+    if ex:
+        return
+
     assert (await cur.fetchone()) == (1, None)
     assert cur.pgresult.fformat(0) == 1
     assert cur.pgresult.get_value(0, 0) == b"\x00\x01"
@@ -219,7 +238,13 @@ async def test_binary_cursor_execute(aconn):
 
 async def test_execute_binary(aconn):
     cur = aconn.cursor()
-    await cur.execute("select %s, %s", [1, None], binary=True)
+    with raiseif(
+        aconn.cursor_factory is psycopg.AsyncClientCursor, psycopg.NotSupportedError
+    ) as ex:
+        await cur.execute("select %s, %s", [1, None], binary=True)
+    if ex:
+        return
+
     assert (await cur.fetchone()) == (1, None)
     assert cur.pgresult.fformat(0) == 1
     assert cur.pgresult.get_value(0, 0) == b"\x00\x01"
@@ -451,10 +476,7 @@ select x from generate_series(4, 6) x;
 async def test_iter(aconn):
     cur = aconn.cursor()
     await cur.execute("select generate_series(1, 3)")
-    res = []
-    async for rec in cur:
-        res.append(rec)
-    assert res == [(1,), (2,), (3,)]
+    assert await alist(cur) == [(1,), (2,), (3,)]
 
 
 async def test_iter_stop(aconn):
@@ -475,6 +497,11 @@ async def test_iter_stop(aconn):
 
 async def test_row_factory(aconn):
     cur = aconn.cursor(row_factory=my_row_factory)
+
+    await cur.execute("reset search_path")
+    with pytest.raises(psycopg.ProgrammingError):
+        await cur.fetchone()
+
     await cur.execute("select 'foo' as bar")
     (r,) = await cur.fetchone()
     assert r == "FOObar"
@@ -560,32 +587,19 @@ async def test_scroll(aconn):
         await cur.scroll(1, "wat")
 
 
-async def test_query_params_execute(aconn):
-    cur = aconn.cursor()
-    assert cur._query is None
-
-    await cur.execute("select %t, %s::text", [1, None])
-    assert cur._query is not None
-    assert cur._query.query == b"select $1, $2::text"
-    assert cur._query.params == [b"1", None]
-
-    await cur.execute("select 1")
-    assert cur._query.query == b"select 1"
-    assert not cur._query.params
-
-    with pytest.raises(psycopg.DataError):
-        await cur.execute("select %t::int", ["wat"])
-
-    assert cur._query.query == b"select $1::int"
-    assert cur._query.params == [b"wat"]
-
-
-async def test_query_params_executemany(aconn):
+@pytest.mark.parametrize(
+    "query, params, want",
+    [
+        ("select %(x)s", {"x": 1}, (1,)),
+        ("select %(x)s, %(y)s", {"x": 1, "y": 2}, (1, 2)),
+        ("select %(x)s, %(x)s", {"x": 1}, (1, 1)),
+    ],
+)
+async def test_execute_params_named(aconn, query, params, want):
     cur = aconn.cursor()
-
-    await cur.executemany("select %t, %t", [[1, 2], [3, 4]])
-    assert cur._query.query == b"select $1, $2"
-    assert cur._query.params == [b"3", b"4"]
+    await cur.execute(query, params)
+    rec = await cur.fetchone()
+    assert rec == want
 
 
 async def test_stream(aconn):
@@ -602,13 +616,13 @@ async def test_stream(aconn):
 
 async def test_stream_sql(aconn):
     cur = aconn.cursor()
-    recs = []
-    async for rec in cur.stream(
-        sql.SQL(
-            "select i, '2021-01-01'::date + i from generate_series(1, {}) as i"
-        ).format(2)
-    ):
-        recs.append(rec)
+    recs = await alist(
+        cur.stream(
+            sql.SQL(
+                "select i, '2021-01-01'::date + i from generate_series(1, {}) as i"
+            ).format(2)
+        )
+    )
 
     assert recs == [(1, dt.date(2021, 1, 2)), (2, dt.date(2021, 1, 3))]
 
@@ -691,8 +705,9 @@ async def test_stream_error_python_consumed(aconn):
     assert aconn.info.transaction_status == aconn.TransactionStatus.INTRANS
 
 
-async def test_stream_close(aconn):
-    await aconn.set_autocommit(True)
+@pytest.mark.parametrize("autocommit", [False, True])
+async def test_stream_close(aconn, autocommit):
+    await aconn.set_autocommit(autocommit)
     cur = aconn.cursor()
     with pytest.raises(psycopg.OperationalError):
         async for rec in cur.stream("select generate_series(1, 3)"):
@@ -705,27 +720,33 @@ async def test_stream_close(aconn):
 
 
 async def test_stream_binary_cursor(aconn):
-    cur = aconn.cursor(binary=True)
-    recs = []
-    async for rec in cur.stream("select x::int4 from generate_series(1, 2) x"):
-        recs.append(rec)
-        assert cur.pgresult.fformat(0) == 1
-        assert cur.pgresult.get_value(0, 0) == bytes([0, 0, 0, rec[0]])
+    with raiseif(
+        aconn.cursor_factory is psycopg.AsyncClientCursor, psycopg.NotSupportedError
+    ):
+        cur = aconn.cursor(binary=True)
+        recs = []
+        async for rec in cur.stream("select x::int4 from generate_series(1, 2) x"):
+            recs.append(rec)
+            assert cur.pgresult.fformat(0) == 1
+            assert cur.pgresult.get_value(0, 0) == bytes([0, 0, 0, rec[0]])
 
-    assert recs == [(1,), (2,)]
+        assert recs == [(1,), (2,)]
 
 
 async def test_stream_execute_binary(aconn):
     cur = aconn.cursor()
     recs = []
-    async for rec in cur.stream(
-        "select x::int4 from generate_series(1, 2) x", binary=True
+    with raiseif(
+        aconn.cursor_factory is psycopg.AsyncClientCursor, psycopg.NotSupportedError
     ):
-        recs.append(rec)
-        assert cur.pgresult.fformat(0) == 1
-        assert cur.pgresult.get_value(0, 0) == bytes([0, 0, 0, rec[0]])
+        async for rec in cur.stream(
+            "select x::int4 from generate_series(1, 2) x", binary=True
+        ):
+            recs.append(rec)
+            assert cur.pgresult.fformat(0) == 1
+            assert cur.pgresult.get_value(0, 0) == bytes([0, 0, 0, rec[0]])
 
-    assert recs == [(1,), (2,)]
+        assert recs == [(1,), (2,)]
 
 
 async def test_stream_binary_cursor_text_override(aconn):
@@ -741,7 +762,6 @@ async def test_stream_binary_cursor_text_override(aconn):
 
 async def test_str(aconn):
     cur = aconn.cursor()
-    assert "psycopg.AsyncCursor" in str(cur)
     assert "[IDLE]" in str(cur)
     assert "[closed]" not in str(cur)
     assert "[no result]" in str(cur)
@@ -755,49 +775,20 @@ async def test_str(aconn):
     assert "[INTRANS]" in str(cur)
 
 
-@pytest.mark.slow
-@pytest.mark.parametrize("fmt", PyFormat)
-@pytest.mark.parametrize("fmt_out", pq.Format)
-@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
-@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
-async def test_leak(aconn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory):
-    faker.format = fmt
-    faker.choose_schema(ncols=5)
-    faker.make_records(10)
-    row_factory = getattr(rows, row_factory)
-
-    async def work():
-        async with await aconn_cls.connect(dsn) as conn, conn.transaction(
-            force_rollback=True
-        ):
-            async with conn.cursor(binary=fmt_out, row_factory=row_factory) as cur:
-                await cur.execute(faker.drop_stmt)
-                await cur.execute(faker.create_stmt)
-                async with faker.find_insert_problem_async(conn):
-                    await cur.executemany(faker.insert_stmt, faker.records)
-                await cur.execute(faker.select_stmt)
-
-                if fetch == "one":
-                    while True:
-                        tmp = await cur.fetchone()
-                        if tmp is None:
-                            break
-                elif fetch == "many":
-                    while True:
-                        tmp = await cur.fetchmany(3)
-                        if not tmp:
-                            break
-                elif fetch == "all":
-                    await cur.fetchall()
-                elif fetch == "iter":
-                    async for rec in cur:
-                        pass
-
-    n = []
-    gc_collect()
-    for i in range(3):
-        await work()
-        gc_collect()
-        n.append(gc_count())
+@pytest.mark.pipeline
+async def test_message_0x33(aconn):
+    # https://github.com/psycopg/psycopg/issues/314
+    notices = []
+    aconn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
+
+    await aconn.set_autocommit(True)
+    async with aconn.pipeline():
+        cur = await aconn.execute("select 'test'")
+        assert (await cur.fetchone()) == ("test",)
+
+    assert not notices
+
 
-    assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+async def test_typeinfo(aconn):
+    info = await TypeInfo.fetch(aconn, "jsonb")
+    assert info is not None
index e712cb98a218e49f03ad5e9a70915bf05e676088..e9bebecdae67982d74ae1475df9d576a6f981bd0 100644 (file)
@@ -78,7 +78,6 @@ def test_leak(conn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory):
                 cur.execute(faker.create_stmt)
                 with faker.find_insert_problem(conn):
                     cur.executemany(faker.insert_stmt, faker.records)
-
                 cur.execute(faker.select_stmt)
 
                 if fetch == "one":
diff --git a/tests/test_default_cursor_async.py b/tests/test_default_cursor_async.py
new file mode 100644 (file)
index 0000000..c3057f5
--- /dev/null
@@ -0,0 +1,110 @@
+"""
+Tests for psycopg.Cursor that are not supposed to pass for subclasses.
+"""
+
+import pytest
+import psycopg
+from psycopg import pq, rows
+from psycopg.adapt import PyFormat
+
+from .utils import gc_collect, gc_count
+
+
+async def test_default_cursor(aconn):
+    cur = aconn.cursor()
+    assert type(cur) is psycopg.AsyncCursor
+
+
+async def test_from_cursor_factory(aconn_cls, dsn):
+    async with await aconn_cls.connect(
+        dsn, cursor_factory=psycopg.AsyncClientCursor
+    ) as aconn:
+        cur = aconn.cursor()
+        assert type(cur) is psycopg.AsyncClientCursor
+
+
+async def test_str(aconn):
+    cur = aconn.cursor()
+    assert "psycopg.AsyncCursor" in str(cur)
+
+
+async def test_execute_many_results_param(aconn):
+    cur = aconn.cursor()
+    with pytest.raises(psycopg.errors.SyntaxError):
+        await cur.execute("select %s; select generate_series(1, %s)", ("foo", 3))
+
+
+async def test_query_params_execute(aconn):
+    cur = aconn.cursor()
+    assert cur._query is None
+
+    await cur.execute("select %t, %s::text", [1, None])
+    assert cur._query is not None
+    assert cur._query.query == b"select $1, $2::text"
+    assert cur._query.params == [b"1", None]
+
+    await cur.execute("select 1")
+    assert cur._query.query == b"select 1"
+    assert not cur._query.params
+
+    with pytest.raises(psycopg.DataError):
+        await cur.execute("select %t::int", ["wat"])
+
+    assert cur._query.query == b"select $1::int"
+    assert cur._query.params == [b"wat"]
+
+
+async def test_query_params_executemany(aconn):
+    cur = aconn.cursor()
+
+    await cur.executemany("select %t, %t", [[1, 2], [3, 4]])
+    assert cur._query.query == b"select $1, $2"
+    assert cur._query.params == [b"3", b"4"]
+
+
+@pytest.mark.slow
+@pytest.mark.parametrize("fmt", PyFormat)
+@pytest.mark.parametrize("fmt_out", pq.Format)
+@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
+@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
+async def test_leak(aconn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory):
+    faker.format = fmt
+    faker.choose_schema(ncols=5)
+    faker.make_records(10)
+    row_factory = getattr(rows, row_factory)
+
+    async def work():
+        async with await aconn_cls.connect(dsn) as conn, conn.transaction(
+            force_rollback=True
+        ):
+            async with conn.cursor(binary=fmt_out, row_factory=row_factory) as cur:
+                await cur.execute(faker.drop_stmt)
+                await cur.execute(faker.create_stmt)
+                async with faker.find_insert_problem_async(conn):
+                    await cur.executemany(faker.insert_stmt, faker.records)
+                await cur.execute(faker.select_stmt)
+
+                if fetch == "one":
+                    while True:
+                        tmp = await cur.fetchone()
+                        if tmp is None:
+                            break
+                elif fetch == "many":
+                    while True:
+                        tmp = await cur.fetchmany(3)
+                        if not tmp:
+                            break
+                elif fetch == "all":
+                    await cur.fetchall()
+                elif fetch == "iter":
+                    async for rec in cur:
+                        pass
+
+    n = []
+    gc_collect()
+    for i in range(3):
+        await work()
+        gc_collect()
+        n.append(gc_count())
+
+    assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"