assert type(cur) is psycopg.ClientCursor
+def test_str(conn):
+ cur = conn.cursor()
+ assert "psycopg.%s" % psycopg.ClientCursor.__name__ in str(cur)
+
+
def test_from_cursor_factory(conn_cls, dsn):
with conn_cls.connect(dsn, cursor_factory=psycopg.ClientCursor) as conn:
cur = conn.cursor()
assert type(cur) is psycopg.AsyncClientCursor
+async def test_str(aconn):
+ cur = aconn.cursor()
+ assert "psycopg.%s" % psycopg.AsyncClientCursor.__name__ in str(cur)
+
+
async def test_from_cursor_factory(aconn_cls, dsn):
async with await aconn_cls.connect(
dsn, cursor_factory=psycopg.AsyncClientCursor
+# WARNING: this file is auto-generated by 'async_to_sync.py'
+# from the original file 'test_default_cursor_async.py'
+# DO NOT CHANGE! Change the original file instead.
"""
Tests for psycopg.Cursor that are not supposed to pass for subclasses.
"""
def test_str(conn):
cur = conn.cursor()
- assert "psycopg.Cursor" in str(cur)
+ assert "psycopg.%s" % psycopg.Cursor.__name__ in str(cur)
def test_execute_many_results_param(conn):
work()
gc_collect()
n.append(gc_count())
+
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
async def test_str(aconn):
cur = aconn.cursor()
- assert "psycopg.AsyncCursor" in str(cur)
+ assert "psycopg.%s" % psycopg.AsyncCursor.__name__ in str(cur)
async def test_execute_many_results_param(aconn):
+# WARNING: this file is auto-generated by 'async_to_sync.py'
+# from the original file 'test_raw_cursor_async.py'
+# DO NOT CHANGE! Change the original file instead.
import pytest
import psycopg
from psycopg import pq, rows, errors as e
@pytest.fixture
-def conn(conn):
+def conn(conn, anyio_backend):
conn.cursor_factory = psycopg.RawCursor
return conn
def test_str(conn):
cur = conn.cursor()
- assert "psycopg.RawCursor" in str(cur)
+ assert "psycopg.%s" % psycopg.RawCursor.__name__ in str(cur)
def test_sequence_only(conn):
row_factory = getattr(rows, row_factory)
def work():
- with conn_cls.connect(dsn) as conn, conn.transaction(force_rollback=True):
- with conn.cursor(binary=fmt_out, row_factory=row_factory) as cur:
- cur.execute(faker.drop_stmt)
- cur.execute(faker.create_stmt)
- with faker.find_insert_problem(conn):
- cur.executemany(faker.insert_stmt, faker.records)
- cur.execute(ph(cur, faker.select_stmt))
-
- if fetch == "one":
- while True:
- tmp = cur.fetchone()
- if tmp is None:
- break
- elif fetch == "many":
- while True:
- tmp = cur.fetchmany(3)
- if not tmp:
- break
- elif fetch == "all":
- cur.fetchall()
- elif fetch == "iter":
- for rec in cur:
- pass
+ with conn_cls.connect(dsn) as conn:
+ with conn.transaction(force_rollback=True):
+ with conn.cursor(binary=fmt_out, row_factory=row_factory) as cur:
+ cur.execute(faker.drop_stmt)
+ cur.execute(faker.create_stmt)
+ with faker.find_insert_problem(conn):
+ cur.executemany(faker.insert_stmt, faker.records)
+ cur.execute(ph(cur, faker.select_stmt))
+
+ if fetch == "one":
+ while True:
+ tmp = cur.fetchone()
+ if tmp is None:
+ break
+ elif fetch == "many":
+ while True:
+ tmp = cur.fetchmany(3)
+ if not tmp:
+ break
+ elif fetch == "all":
+ cur.fetchall()
+ elif fetch == "iter":
+ for rec in cur:
+ pass
n = []
gc_collect()
async def test_str(aconn):
cur = aconn.cursor()
- assert "psycopg.AsyncRawCursor" in str(cur)
+ assert "psycopg.%s" % psycopg.AsyncRawCursor.__name__ in str(cur)
async def test_sequence_only(aconn):
+# WARNING: this file is auto-generated by 'async_to_sync.py'
+# from the original file 'test_server_cursor_async.py'
+# DO NOT CHANGE! Change the original file instead.
import pytest
import psycopg
from psycopg import rows, errors as e
from psycopg.pq import Format
+
pytestmark = pytest.mark.crdb_skip("server-side cursor")
def test_repr(conn):
cur = conn.cursor("my-name")
- assert "psycopg.ServerCursor" in str(cur)
+ assert "psycopg.%s" % psycopg.ServerCursor.__name__ in str(cur)
assert "my-name" in repr(cur)
cur.close()
with conn.cursor("foo") as cur:
assert cur._query is None
cur.execute("select generate_series(1, %s) as bar", (3,))
- assert cur._query
+ assert cur._query is not None
assert b"declare" in cur._query.query.lower()
assert b"(1, $1)" in cur._query.query.lower()
assert cur._query.params == [bytes([0, 3])] # 3 as binary int2
from psycopg import rows, errors as e
from psycopg.pq import Format
-pytestmark = [
- pytest.mark.crdb_skip("server-side cursor"),
-]
+from .utils import alist
+
+pytestmark = pytest.mark.crdb_skip("server-side cursor")
async def test_init_row_factory(aconn):
async def test_repr(aconn):
cur = aconn.cursor("my-name")
- assert "psycopg.AsyncServerCursor" in str(cur)
+ assert "psycopg.%s" % psycopg.AsyncServerCursor.__name__ in str(cur)
assert "my-name" in repr(cur)
await cur.close()
assert cur.pgresult.fformat(0) == 1
assert cur.pgresult.get_value(0, 0) == b"\x00\x00\x00\x02"
- await cur.execute("select generate_series(1, 1)")
+ await cur.execute("select generate_series(1, 1)::int4")
assert (await cur.fetchone()) == (1,)
assert cur.pgresult.fformat(0) == 0
assert cur.pgresult.get_value(0, 0) == b"1"
async def test_iter(aconn):
async with aconn.cursor("foo") as cur:
await cur.execute("select generate_series(1, %s) as bar", (3,))
- recs = []
- async for rec in cur:
- recs.append(rec)
+ recs = await alist(cur)
assert recs == [(1,), (2,), (3,)]
async with aconn.cursor("foo") as cur:
await cur.execute("select generate_series(1, %s) as bar", (3,))
assert await cur.fetchone() == (1,)
- recs = []
- async for rec in cur:
- recs.append(rec)
+ recs = await alist(cur)
assert recs == [(2,), (3,)]
await cur.execute("select generate_series(1, %s) as bar", (3,))
acommands.popall() # flush begin and other noise
- async for rec in cur:
- pass
+ await alist(cur)
cmds = acommands.popall()
assert len(cmds) == 2
for cmd in cmds:
@pytest.mark.parametrize("row_factory", ["tuple_row", "namedtuple_row"])
async def test_steal_cursor(aconn, row_factory):
cur1 = aconn.cursor()
- await cur1.execute(
- "declare test cursor without hold for select generate_series(1, 6) as s"
- )
+ await cur1.execute("declare test cursor for select generate_series(1, 6) as s")
cur2 = aconn.cursor("test", row_factory=getattr(rows, row_factory))
# can call fetch without execute
"AsyncServerCursor": "ServerCursor",
"aclose": "close",
"aclosing": "closing",
+ "acommands": "commands",
"aconn": "conn",
"aconn_cls": "conn_cls",
"aconn_set": "conn_set",
tests/test_connection_async.py \
tests/test_copy_async.py \
tests/test_cursor_async.py \
- tests/test_pipeline_async.py
+ tests/test_default_cursor_async.py \
+ tests/test_pipeline_async.py \
+ tests/test_raw_cursor_async.py \
+ tests/test_server_cursor_async.py
do
sync=${async/_async/}
echo "converting '${async}' -> '${sync}'" >&2