with conn.transaction():
yield
except psycopg.DatabaseError:
- cur = conn.cursor()
+ cur = psycopg.Cursor(conn)
# Repeat insert one field at time, until finding the wrong one
cur.execute(self.drop_stmt)
cur.execute(self.create_stmt)
async with aconn.transaction():
yield
except psycopg.DatabaseError:
- acur = aconn.cursor()
+ acur = psycopg.AsyncCursor(aconn)
# Repeat insert one field at time, until finding the wrong one
await acur.execute(self.drop_stmt)
await acur.execute(self.create_stmt)
Tests common to psycopg.Cursor and its subclasses.
"""
+import re
import pickle
import weakref
import datetime as dt
-from typing import List, Union
+from typing import Any, List, Match, Union
from contextlib import closing
import pytest
return conn
+def ph(cur: Any, query: str) -> str:
+ """Change placeholders in a query from %s to $n if testing a raw cursor"""
+ if not isinstance(cur, (psycopg.RawCursor, psycopg.AsyncRawCursor)):
+ return query
+
+ if "%(" in query:
+ raise pytest.skip("RawCursor only supports positional placeholders")
+
+ n = 1
+
+ def s(m: Match[str]) -> str:
+ nonlocal n
+ rv = f"${n}"
+ n += 1
+ return rv
+
+ return re.sub(r"(?<!%)(%[bst])", s, query)
+
+
def test_init(conn):
cur = conn.cursor_factory(conn)
cur.execute("select 1")
def test_execute_sequence(conn):
cur = conn.cursor()
- rv = cur.execute("select %s::int, %s::text, %s::text", [1, "foo", None])
+ rv = cur.execute(ph(cur, "select %s::int, %s::text, %s::text"), [1, "foo", None])
assert rv is cur
assert len(cur._results) == 1
assert cur.pgresult.get_value(0, 0) == b"1"
def test_execute_type_change(conn):
# issue #112
conn.execute("create table bug_112 (num integer)")
- sql = "insert into bug_112 (num) values (%s)"
cur = conn.cursor()
+ sql = ph(cur, "insert into bug_112 (num) values (%s)")
cur.execute(sql, (1,))
cur.execute(sql, (100_000,))
cur.execute("select num from bug_112 order by num")
def test_executemany_type_change(conn):
conn.execute("create table bug_112 (num integer)")
- sql = "insert into bug_112 (num) values (%s)"
cur = conn.cursor()
+ sql = ph(cur, "insert into bug_112 (num) values (%s)")
cur.executemany(sql, [(1,), (100_000,)])
cur.execute("select num from bug_112 order by num")
assert cur.fetchall() == [(1,), (100_000,)]
def test_fetchone(conn):
cur = conn.cursor()
- cur.execute("select %s::int, %s::text, %s::text", [1, "foo", None])
+ cur.execute(ph(cur, "select %s::int, %s::text, %s::text"), [1, "foo", None])
assert cur.pgresult.fformat(0) == 0
row = cur.fetchone()
conn.cursor_factory is psycopg.ClientCursor, psycopg.NotSupportedError
) as ex:
cur = conn.cursor(binary=True)
- cur.execute("select %s, %s", [1, None])
+ cur.execute(ph(cur, "select %s, %s"), [1, None])
if ex:
return
with raiseif(
conn.cursor_factory is psycopg.ClientCursor, psycopg.NotSupportedError
) as ex:
- cur.execute("select %s, %s", [1, None], binary=True)
+ cur.execute(ph(cur, "select %s, %s"), [1, None], binary=True)
if ex:
return
def test_binary_cursor_text_override(conn):
cur = conn.cursor(binary=True)
- cur.execute("select %s, %s", [1, None], binary=False)
+ cur.execute(ph(cur, "select %s, %s"), [1, None], binary=False)
assert cur.fetchone() == (1, None)
assert cur.pgresult.fformat(0) == 0
assert cur.pgresult.get_value(0, 0) == b"1"
def test_executemany(conn, execmany):
cur = conn.cursor()
cur.executemany(
- "insert into execmany(num, data) values (%s, %s)",
+ ph(cur, "insert into execmany(num, data) values (%s, %s)"),
[(10, "hello"), (20, "world")],
)
cur.execute("select num, data from execmany order by 1")
def test_executemany_name(conn, execmany):
cur = conn.cursor()
cur.executemany(
- "insert into execmany(num, data) values (%(num)s, %(data)s)",
+ ph(cur, "insert into execmany(num, data) values (%(num)s, %(data)s)"),
[{"num": 11, "data": "hello", "x": 1}, {"num": 21, "data": "world"}],
)
cur.execute("select num, data from execmany order by 1")
def test_executemany_no_data(conn, execmany):
cur = conn.cursor()
- cur.executemany("insert into execmany(num, data) values (%s, %s)", [])
+ cur.executemany(ph(cur, "insert into execmany(num, data) values (%s, %s)"), [])
assert cur.rowcount == 0
def test_executemany_rowcount(conn, execmany):
cur = conn.cursor()
cur.executemany(
- "insert into execmany(num, data) values (%s, %s)",
+ ph(cur, "insert into execmany(num, data) values (%s, %s)"),
[(10, "hello"), (20, "world")],
)
assert cur.rowcount == 2
def test_executemany_returning(conn, execmany):
cur = conn.cursor()
cur.executemany(
- "insert into execmany(num, data) values (%s, %s) returning num",
+ ph(cur, "insert into execmany(num, data) values (%s, %s) returning num"),
[(10, "hello"), (20, "world")],
returning=True,
)
def test_executemany_returning_discard(conn, execmany):
cur = conn.cursor()
cur.executemany(
- "insert into execmany(num, data) values (%s, %s) returning num",
+ ph(cur, "insert into execmany(num, data) values (%s, %s) returning num"),
[(10, "hello"), (20, "world")],
)
assert cur.rowcount == 2
def test_executemany_no_result(conn, execmany):
cur = conn.cursor()
cur.executemany(
- "insert into execmany(num, data) values (%s, %s)",
+ ph(cur, "insert into execmany(num, data) values (%s, %s)"),
[(10, "hello"), (20, "world")],
returning=True,
)
def test_executemany_rowcount_no_hit(conn, execmany):
cur = conn.cursor()
- cur.executemany("delete from execmany where id = %s", [(-1,), (-2,)])
+ cur.executemany(ph(cur, "delete from execmany where id = %s"), [(-1,), (-2,)])
assert cur.rowcount == 0
- cur.executemany("delete from execmany where id = %s", [])
+ cur.executemany(ph(cur, "delete from execmany where id = %s"), [])
assert cur.rowcount == 0
- cur.executemany("delete from execmany where id = %s returning num", [(-1,), (-2,)])
+ cur.executemany(
+ ph(cur, "delete from execmany where id = %s returning num"), [(-1,), (-2,)]
+ )
assert cur.rowcount == 0
def test_executemany_badquery(conn, query):
cur = conn.cursor()
with pytest.raises(psycopg.DatabaseError):
- cur.executemany(query, [(10, "hello"), (20, "world")])
+ cur.executemany(ph(cur, query), [(10, "hello"), (20, "world")])
@pytest.mark.parametrize("fmt_in", PyFormat)
cur = conn.cursor()
cur.execute("create table testmany (a bigint, b bigint)")
cur.executemany(
- f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})",
+ ph(cur, f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})"),
[[1, None], [3, 4]],
)
with pytest.raises((psycopg.DataError, psycopg.ProgrammingError)):
)
def test_execute_params_named(conn, query, params, want):
cur = conn.cursor()
- cur.execute(query, params)
+ cur.execute(ph(cur, query), params)
rec = cur.fetchone()
assert rec == want
cur = conn.cursor()
recs = []
for rec in cur.stream(
- "select i, '2021-01-01'::date + i from generate_series(1, %s) as i",
+ ph(cur, "select i, '2021-01-01'::date + i from generate_series(1, %s) as i"),
[2],
):
recs.append(rec)
from psycopg.types import TypeInfo
from .utils import alist, gc_collect, raiseif
-from .test_cursor import my_row_factory
+from .test_cursor import my_row_factory, ph
from .test_cursor import execmany, _execmany # noqa: F401
from .fix_crdb import crdb_encoding
async def test_execute_sequence(aconn):
cur = aconn.cursor()
- rv = await cur.execute("select %s::int, %s::text, %s::text", [1, "foo", None])
+ rv = await cur.execute(
+ ph(cur, "select %s::int, %s::text, %s::text"), [1, "foo", None]
+ )
assert rv is cur
assert len(cur._results) == 1
assert cur.pgresult.get_value(0, 0) == b"1"
async def test_execute_type_change(aconn):
# issue #112
await aconn.execute("create table bug_112 (num integer)")
- sql = "insert into bug_112 (num) values (%s)"
cur = aconn.cursor()
+ sql = ph(cur, "insert into bug_112 (num) values (%s)")
await cur.execute(sql, (1,))
await cur.execute(sql, (100_000,))
await cur.execute("select num from bug_112 order by num")
async def test_executemany_type_change(aconn):
await aconn.execute("create table bug_112 (num integer)")
- sql = "insert into bug_112 (num) values (%s)"
cur = aconn.cursor()
+ sql = ph(cur, "insert into bug_112 (num) values (%s)")
await cur.executemany(sql, [(1,), (100_000,)])
await cur.execute("select num from bug_112 order by num")
assert (await cur.fetchall()) == [(1,), (100_000,)]
async def test_fetchone(aconn):
cur = aconn.cursor()
- await cur.execute("select %s::int, %s::text, %s::text", [1, "foo", None])
+ await cur.execute(ph(cur, "select %s::int, %s::text, %s::text"), [1, "foo", None])
assert cur.pgresult.fformat(0) == 0
row = await cur.fetchone()
aconn.cursor_factory is psycopg.AsyncClientCursor, psycopg.NotSupportedError
) as ex:
cur = aconn.cursor(binary=True)
- await cur.execute("select %s, %s", [1, None])
+ await cur.execute(ph(cur, "select %s, %s"), [1, None])
if ex:
return
with raiseif(
aconn.cursor_factory is psycopg.AsyncClientCursor, psycopg.NotSupportedError
) as ex:
- await cur.execute("select %s, %s", [1, None], binary=True)
+ await cur.execute(ph(cur, "select %s, %s"), [1, None], binary=True)
if ex:
return
async def test_binary_cursor_text_override(aconn):
cur = aconn.cursor(binary=True)
- await cur.execute("select %s, %s", [1, None], binary=False)
+ await cur.execute(ph(cur, "select %s, %s"), [1, None], binary=False)
assert (await cur.fetchone()) == (1, None)
assert cur.pgresult.fformat(0) == 0
assert cur.pgresult.get_value(0, 0) == b"1"
async def test_executemany(aconn, execmany):
cur = aconn.cursor()
await cur.executemany(
- "insert into execmany(num, data) values (%s, %s)",
+ ph(cur, "insert into execmany(num, data) values (%s, %s)"),
[(10, "hello"), (20, "world")],
)
await cur.execute("select num, data from execmany order by 1")
async def test_executemany_name(aconn, execmany):
cur = aconn.cursor()
await cur.executemany(
- "insert into execmany(num, data) values (%(num)s, %(data)s)",
+ ph(cur, "insert into execmany(num, data) values (%(num)s, %(data)s)"),
[{"num": 11, "data": "hello", "x": 1}, {"num": 21, "data": "world"}],
)
await cur.execute("select num, data from execmany order by 1")
async def test_executemany_no_data(aconn, execmany):
cur = aconn.cursor()
- await cur.executemany("insert into execmany(num, data) values (%s, %s)", [])
+ await cur.executemany(
+ ph(cur, "insert into execmany(num, data) values (%s, %s)"), []
+ )
assert cur.rowcount == 0
async def test_executemany_rowcount(aconn, execmany):
cur = aconn.cursor()
await cur.executemany(
- "insert into execmany(num, data) values (%s, %s)",
+ ph(cur, "insert into execmany(num, data) values (%s, %s)"),
[(10, "hello"), (20, "world")],
)
assert cur.rowcount == 2
async def test_executemany_returning(aconn, execmany):
cur = aconn.cursor()
await cur.executemany(
- "insert into execmany(num, data) values (%s, %s) returning num",
+ ph(cur, "insert into execmany(num, data) values (%s, %s) returning num"),
[(10, "hello"), (20, "world")],
returning=True,
)
async def test_executemany_returning_discard(aconn, execmany):
cur = aconn.cursor()
await cur.executemany(
- "insert into execmany(num, data) values (%s, %s) returning num",
+ ph(cur, "insert into execmany(num, data) values (%s, %s) returning num"),
[(10, "hello"), (20, "world")],
)
assert cur.rowcount == 2
async def test_executemany_no_result(aconn, execmany):
cur = aconn.cursor()
await cur.executemany(
- "insert into execmany(num, data) values (%s, %s)",
+ ph(cur, "insert into execmany(num, data) values (%s, %s)"),
[(10, "hello"), (20, "world")],
returning=True,
)
async def test_executemany_rowcount_no_hit(aconn, execmany):
cur = aconn.cursor()
- await cur.executemany("delete from execmany where id = %s", [(-1,), (-2,)])
+ await cur.executemany(ph(cur, "delete from execmany where id = %s"), [(-1,), (-2,)])
assert cur.rowcount == 0
- await cur.executemany("delete from execmany where id = %s", [])
+ await cur.executemany(ph(cur, "delete from execmany where id = %s"), [])
assert cur.rowcount == 0
await cur.executemany(
- "delete from execmany where id = %s returning num", [(-1,), (-2,)]
+ ph(cur, "delete from execmany where id = %s returning num"), [(-1,), (-2,)]
)
assert cur.rowcount == 0
async def test_executemany_badquery(aconn, query):
cur = aconn.cursor()
with pytest.raises(psycopg.DatabaseError):
- await cur.executemany(query, [(10, "hello"), (20, "world")])
+ await cur.executemany(ph(cur, query), [(10, "hello"), (20, "world")])
@pytest.mark.parametrize("fmt_in", PyFormat)
cur = aconn.cursor()
await cur.execute("create table testmany (a bigint, b bigint)")
await cur.executemany(
- f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})",
+ ph(cur, f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})"),
[[1, None], [3, 4]],
)
with pytest.raises((psycopg.DataError, psycopg.ProgrammingError)):
await cur.executemany(
- f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})",
+ ph(cur, f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})"),
[[1, ""], [3, 4]],
)
)
async def test_execute_params_named(aconn, query, params, want):
cur = aconn.cursor()
- await cur.execute(query, params)
+ await cur.execute(ph(cur, query), params)
rec = await cur.fetchone()
assert rec == want
cur = aconn.cursor()
recs = []
async for rec in cur.stream(
- "select i, '2021-01-01'::date + i from generate_series(1, %s) as i",
+ ph(cur, "select i, '2021-01-01'::date + i from generate_series(1, %s) as i"),
[2],
):
recs.append(rec)
--- /dev/null
+import pytest
+import psycopg
+from psycopg import pq, rows, errors as e
+from psycopg.adapt import PyFormat
+
+from .test_cursor import ph
+from .utils import gc_collect, gc_count
+
+
+@pytest.fixture
+def conn(conn):
+ conn.cursor_factory = psycopg.RawCursor
+ return conn
+
+
+def test_default_cursor(conn):
+ cur = conn.cursor()
+ assert type(cur) is psycopg.RawCursor
+
+
+def test_str(conn):
+ cur = conn.cursor()
+ assert "psycopg.RawCursor" in str(cur)
+
+
+def test_sequence_only(conn):
+ cur = conn.cursor()
+ cur.execute("select 1", ())
+ assert cur.fetchone() == (1,)
+
+ with pytest.raises(TypeError, match="sequence"):
+ cur.execute("select 1", {})
+
+
+def test_execute_many_results_param(conn):
+ cur = conn.cursor()
+ # Postgres raises SyntaxError, CRDB raises InvalidPreparedStatementDefinition
+ with pytest.raises((e.SyntaxError, e.InvalidPreparedStatementDefinition)):
+ cur.execute("select $1; select generate_series(1, $2)", ("foo", 3))
+
+
+def test_query_params_execute(conn):
+ cur = conn.cursor()
+ assert cur._query is None
+
+ cur.execute("select $1, $2::text", [1, None])
+ assert cur._query is not None
+ assert cur._query.query == b"select $1, $2::text"
+ assert cur._query.params == [b"\x00\x01", None]
+
+ cur.execute("select 1")
+ assert cur._query.query == b"select 1"
+ assert not cur._query.params
+
+ with pytest.raises(psycopg.DataError):
+ cur.execute("select $1::int", ["wat"])
+
+ assert cur._query.query == b"select $1::int"
+ assert cur._query.params == [b"wat"]
+
+
+def test_query_params_executemany(conn):
+ cur = conn.cursor()
+
+ cur.executemany("select $1, $2", [[1, 2], [3, 4]])
+ assert cur._query.query == b"select $1, $2"
+ assert cur._query.params == [b"\x00\x03", b"\x00\x04"]
+
+
+@pytest.mark.slow
+@pytest.mark.parametrize("fmt", PyFormat)
+@pytest.mark.parametrize("fmt_out", pq.Format)
+@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
+@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
+def test_leak(conn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory):
+ faker.format = fmt
+ faker.choose_schema(ncols=5)
+ faker.make_records(10)
+ row_factory = getattr(rows, row_factory)
+
+ def work():
+ with conn_cls.connect(dsn) as conn, conn.transaction(force_rollback=True):
+ with conn.cursor(binary=fmt_out, row_factory=row_factory) as cur:
+ cur.execute(faker.drop_stmt)
+ cur.execute(faker.create_stmt)
+ with faker.find_insert_problem(conn):
+ cur.executemany(faker.insert_stmt, faker.records)
+ cur.execute(ph(cur, faker.select_stmt))
+
+ if fetch == "one":
+ while True:
+ tmp = cur.fetchone()
+ if tmp is None:
+ break
+ elif fetch == "many":
+ while True:
+ tmp = cur.fetchmany(3)
+ if not tmp:
+ break
+ elif fetch == "all":
+ cur.fetchall()
+ elif fetch == "iter":
+ for rec in cur:
+ pass
+
+ n = []
+ gc_collect()
+ for i in range(3):
+ work()
+ gc_collect()
+ n.append(gc_count())
+ assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
--- /dev/null
+import pytest
+import psycopg
+from psycopg import pq, rows, errors as e
+from psycopg.adapt import PyFormat
+
+from .test_cursor import ph
+from .utils import gc_collect, gc_count
+
+
+@pytest.fixture
+async def aconn(aconn, anyio_backend):
+ aconn.cursor_factory = psycopg.AsyncRawCursor
+ return aconn
+
+
+async def test_default_cursor(aconn):
+ cur = aconn.cursor()
+ assert type(cur) is psycopg.AsyncRawCursor
+
+
+async def test_str(aconn):
+ cur = aconn.cursor()
+ assert "psycopg.AsyncRawCursor" in str(cur)
+
+
+async def test_sequence_only(aconn):
+ cur = aconn.cursor()
+ await cur.execute("select 1", ())
+ assert await cur.fetchone() == (1,)
+
+ with pytest.raises(TypeError, match="sequence"):
+ await cur.execute("select 1", {})
+
+
+async def test_execute_many_results_param(aconn):
+ cur = aconn.cursor()
+ # Postgres raises SyntaxError, CRDB raises InvalidPreparedStatementDefinition
+ with pytest.raises((e.SyntaxError, e.InvalidPreparedStatementDefinition)):
+ await cur.execute("select $1; select generate_series(1, $2)", ("foo", 3))
+
+
+async def test_query_params_execute(aconn):
+ cur = aconn.cursor()
+ assert cur._query is None
+
+ await cur.execute("select $1, $2::text", [1, None])
+ assert cur._query is not None
+ assert cur._query.query == b"select $1, $2::text"
+ assert cur._query.params == [b"\x00\x01", None]
+
+ await cur.execute("select 1")
+ assert cur._query.query == b"select 1"
+ assert not cur._query.params
+
+ with pytest.raises(psycopg.DataError):
+ await cur.execute("select $1::int", ["wat"])
+
+ assert cur._query.query == b"select $1::int"
+ assert cur._query.params == [b"wat"]
+
+
+async def test_query_params_executemany(aconn):
+ cur = aconn.cursor()
+
+ await cur.executemany("select $1, $2", [[1, 2], [3, 4]])
+ assert cur._query.query == b"select $1, $2"
+ assert cur._query.params == [b"\x00\x03", b"\x00\x04"]
+
+
+@pytest.mark.slow
+@pytest.mark.parametrize("fmt", PyFormat)
+@pytest.mark.parametrize("fmt_out", pq.Format)
+@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
+@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
+async def test_leak(aconn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory):
+ faker.format = fmt
+ faker.choose_schema(ncols=5)
+ faker.make_records(10)
+ row_factory = getattr(rows, row_factory)
+
+ async def work():
+ async with await aconn_cls.connect(dsn) as aconn:
+ async with aconn.transaction(force_rollback=True):
+ async with aconn.cursor(binary=fmt_out, row_factory=row_factory) as cur:
+ await cur.execute(faker.drop_stmt)
+ await cur.execute(faker.create_stmt)
+ async with faker.find_insert_problem_async(aconn):
+ await cur.executemany(faker.insert_stmt, faker.records)
+ await cur.execute(ph(cur, faker.select_stmt))
+
+ if fetch == "one":
+ while True:
+ tmp = await cur.fetchone()
+ if tmp is None:
+ break
+ elif fetch == "many":
+ while True:
+ tmp = await cur.fetchmany(3)
+ if not tmp:
+ break
+ elif fetch == "all":
+ await cur.fetchall()
+ elif fetch == "iter":
+ async for rec in cur:
+ pass
+
+ n = []
+ gc_collect()
+ for i in range(3):
+ await work()
+ gc_collect()
+ n.append(gc_count())
+ assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"