if isinstance(name, Composable):
name = name.as_string(conn)
- cur = await conn.cursor(binary=True)
+ cur = conn.cursor(binary=True)
await cur.execute(cls._info_query, {"name": name})
recs = await cur.fetchall()
fields = [d[0] for d in cur.description or ()]
self.pgconn.finish()
@overload
- async def cursor(self, *, binary: bool = False) -> AsyncCursor:
+ def cursor(self, *, binary: bool = False) -> AsyncCursor:
...
@overload
- async def cursor(
- self, name: str, *, binary: bool = False
- ) -> AsyncNamedCursor:
+ def cursor(self, name: str, *, binary: bool = False) -> AsyncNamedCursor:
...
- async def cursor(
+ def cursor(
self, name: str = "", *, binary: bool = False
) -> Union[AsyncCursor, AsyncNamedCursor]:
"""
params: Optional[Params] = None,
prepare: Optional[bool] = None,
) -> AsyncCursor:
- cur = await self.cursor()
+ cur = self.cursor()
return await cur.execute(query, params, prepare=prepare)
async def commit(self) -> None:
async def runner():
nonlocal stop
- cur = await aconn.cursor()
+ cur = aconn.cursor()
for i in range(1000):
await cur.execute("select %s;", (i,))
await aconn.commit()
async def test_concurrent_execution(dsn):
async def worker():
cnn = await psycopg3.AsyncConnection.connect(dsn)
- cur = await cnn.cursor()
+ cur = cnn.cursor()
await cur.execute("select pg_sleep(0.5)")
await cur.close()
await cnn.close()
npid = nconn.pgconn.backend_pid
async def notifier():
- cur = await nconn.cursor()
+ cur = nconn.cursor()
await asyncio.sleep(0.25)
await cur.execute("notify foo, '1'")
await asyncio.sleep(0.25)
async def receiver():
await aconn.set_autocommit(True)
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("listen foo")
gen = aconn.notifies()
async for n in gen:
errors.append(exc)
async def worker():
- cur = await aconn.cursor()
+ cur = aconn.cursor()
with pytest.raises(psycopg3.DatabaseError):
await cur.execute("select pg_sleep(2)")
# still working
await aconn.rollback()
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("select 1")
assert await cur.fetchone() == (1,)
assert aconn.closed
assert aconn.pgconn.status == aconn.ConnStatus.BAD
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await aconn.close()
assert aconn.closed
async def test_context_commit(aconn, dsn):
async with aconn:
- async with await aconn.cursor() as cur:
+ async with aconn.cursor() as cur:
await cur.execute("drop table if exists textctx")
await cur.execute("create table textctx ()")
assert aconn.closed
async with await psycopg3.AsyncConnection.connect(dsn) as aconn:
- async with await aconn.cursor() as cur:
+ async with aconn.cursor() as cur:
await cur.execute("select * from textctx")
assert await cur.fetchall() == []
async def test_context_rollback(aconn, dsn):
- async with await aconn.cursor() as cur:
+ async with aconn.cursor() as cur:
await cur.execute("drop table if exists textctx")
await aconn.commit()
with pytest.raises(ZeroDivisionError):
async with aconn:
- async with await aconn.cursor() as cur:
+ async with aconn.cursor() as cur:
await cur.execute("create table textctx ()")
1 / 0
assert aconn.closed
async with await psycopg3.AsyncConnection.connect(dsn) as aconn:
- async with await aconn.cursor() as cur:
+ async with aconn.cursor() as cur:
with pytest.raises(UndefinedTable):
await cur.execute("select * from textctx")
aconn.pgconn.exec_(b"drop table if exists foo")
aconn.pgconn.exec_(b"create table foo (id int primary key)")
- cur = await aconn.cursor()
+ cur = aconn.cursor()
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
await cur.execute("insert into foo values (1)")
aconn.pgconn.exec_(b"drop table if exists foo")
aconn.pgconn.exec_(b"create table foo (id int primary key)")
- cur = await aconn.cursor()
+ cur = aconn.cursor()
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
await cur.execute("insert into foo values (1)")
await aconn.set_autocommit(True)
assert aconn.autocommit
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("select 1")
assert await cur.fetchone() == (1,)
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
async def test_autocommit_intrans(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("select 1")
assert await cur.fetchone() == (1,)
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
async def test_autocommit_inerror(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
with pytest.raises(psycopg3.DatabaseError):
await cur.execute("meh")
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INERROR
async def test_get_encoding(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("show client_encoding")
(enc,) = await cur.fetchone()
assert aconn.client_encoding == encodings.pg2py(enc)
assert aconn.client_encoding != newenc
await aconn.set_client_encoding(newenc)
assert aconn.client_encoding == newenc
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("show client_encoding")
(enc,) = await cur.fetchone()
assert encodings.pg2py(enc) == newenc
async def test_set_encoding_unsupported(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("set client_encoding to EUC_TW")
with pytest.raises(psycopg3.NotSupportedError):
await cur.execute("select 'x'")
async def test_broken_connection(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
with pytest.raises(psycopg3.DatabaseError):
await cur.execute("select pg_terminate_backend(pg_backend_pid())")
assert aconn.closed
aconn.add_notice_handler(lambda diag: severities.append(diag.severity))
aconn.pgconn.exec_(b"set client_min_messages to notice")
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute(
"do $$begin raise notice 'hello notice'; end$$ language plpgsql"
)
aconn.add_notify_handler(lambda n: nots2.append(n))
await aconn.set_autocommit(True)
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("listen foo")
await cur.execute("notify foo, 'n1'")
else:
want = sample_binary_rows
- cur = await aconn.cursor()
+ cur = aconn.cursor()
async with cur.copy(
f"copy ({sample_values}) to stdout (format {format.name})"
) as copy:
else:
want = sample_binary_rows
- cur = await aconn.cursor()
+ cur = aconn.cursor()
got = []
async with cur.copy(
f"copy ({sample_values}) to stdout (format {format.name})"
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
async def test_read_rows(aconn, format):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
async with cur.copy(
f"copy ({sample_values}) to stdout (format {format.name})"
) as copy:
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
async def test_rows(aconn, format):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
async with cur.copy(
f"copy ({sample_values}) to stdout (format {format.name})"
) as copy:
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
async def test_copy_out_allchars(aconn, format):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
chars = list(map(chr, range(1, 256))) + [eur]
await aconn.set_client_encoding("utf8")
rows = []
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
async def test_read_row_notypes(aconn, format):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
async with cur.copy(
f"copy ({sample_values}) to stdout (format {format.name})"
) as copy:
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
async def test_rows_notypes(aconn, format):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
async with cur.copy(
f"copy ({sample_values}) to stdout (format {format.name})"
) as copy:
@pytest.mark.parametrize("err", [-1, 1])
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
async def test_copy_out_badntypes(aconn, format, err):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
async with cur.copy(
f"copy ({sample_values}) to stdout (format {format.name})"
) as copy:
[(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")],
)
async def test_copy_in_buffers(aconn, format, buffer):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
async with cur.copy(
f"copy copy_in from stdin (format {format.name})"
async def test_copy_in_buffers_pg_error(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
with pytest.raises(e.UniqueViolation):
async with cur.copy("copy copy_in from stdin (format text)") as copy:
async def test_copy_bad_result(aconn):
await aconn.set_autocommit(True)
- cur = await aconn.cursor()
+ cur = aconn.cursor()
with pytest.raises(e.SyntaxError):
async with cur.copy("wat"):
async def test_copy_in_str(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
async with cur.copy("copy copy_in from stdin (format text)") as copy:
await copy.write(sample_text.decode("utf8"))
async def test_copy_in_str_binary(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
with pytest.raises(e.QueryCanceled):
async with cur.copy("copy copy_in from stdin (format binary)") as copy:
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
async def test_copy_in_empty(aconn, format):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
async with cur.copy(f"copy copy_in from stdin (format {format.name})"):
pass
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
async def test_copy_in_error_empty(aconn, format):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
with pytest.raises(e.QueryCanceled) as exc:
async with cur.copy(f"copy copy_in from stdin (format {format.name})"):
async def test_copy_in_buffers_with_pg_error(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
with pytest.raises(e.UniqueViolation):
async with cur.copy("copy copy_in from stdin (format text)") as copy:
async def test_copy_in_buffers_with_py_error(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
with pytest.raises(e.QueryCanceled) as exc:
async with cur.copy("copy copy_in from stdin (format text)") as copy:
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
async def test_copy_in_records(aconn, format):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
async with cur.copy(
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
async def test_copy_in_records_binary(aconn, format):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await ensure_table(cur, "col1 serial primary key, col2 int, data text")
async with cur.copy(
async def test_copy_in_allchars(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
await aconn.set_client_encoding("utf8")
# Roundtrip from file to database to file blockwise
gen = DataGenerator(aconn, nrecs=1024, srec=10 * 1024)
await gen.ensure_table()
- cur = await aconn.cursor()
+ cur = aconn.cursor()
async with cur.copy("copy copy_in from stdin") as copy:
for block in gen.blocks():
await copy.write(block)
# Roundtrip from file to database to file blockwise
gen = DataGenerator(aconn, nrecs=1024, srec=10 * 1024)
await gen.ensure_table()
- cur = await aconn.cursor()
+ cur = aconn.cursor()
async with cur.copy("copy copy_in from stdin") as copy:
for block in gen.blocks():
await copy.write(block.encode("utf8"))
aconn, nrecs=4 * 1024, srec=10 * 1024, block_size=20 * 1024 * 1024
)
await gen.ensure_table()
- cur = await aconn.cursor()
+ cur = aconn.cursor()
async with cur.copy("copy copy_in from stdin") as copy:
for block in gen.blocks():
await copy.write(block)
gen = DataGenerator(aconn, nrecs=3, srec=10)
await gen.ensure_table()
- cur = await aconn.cursor()
+ cur = aconn.cursor()
async with cur.copy("copy copy_in from stdin") as copy:
for block in gen.blocks():
await copy.write(block)
async def test_copy_query(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
async with cur.copy("copy (select 1) to stdout") as copy:
assert cur.query == b"copy (select 1) to stdout"
assert cur.params is None
async def test_cant_reenter(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
async with cur.copy("copy (select 1) to stdout") as copy:
async for record in copy:
pass
async def test_str(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
async with cur.copy("copy (select 1) to stdout") as copy:
assert "[ACTIVE]" in str(copy)
async for record in copy:
[(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")],
)
async def test_worker_life(aconn, format, buffer):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
async with cur.copy(
f"copy copy_in from stdin (format {format.name})"
n = []
for i in range(3):
async with await psycopg3.AsyncConnection.connect(dsn) as conn:
- async with await conn.cursor(binary=fmt) as cur:
+ async with conn.cursor(binary=fmt) as cur:
await cur.execute(faker.drop_stmt)
await cur.execute(faker.create_stmt)
await cur.executemany(faker.insert_stmt, faker.records)
n = []
for i in range(3):
async with await psycopg3.AsyncConnection.connect(dsn) as conn:
- async with await conn.cursor(binary=fmt) as cur:
+ async with conn.cursor(binary=fmt) as cur:
await cur.execute(faker.drop_stmt)
await cur.execute(faker.create_stmt)
self.block_size = block_size
async def ensure_table(self):
- cur = await self.conn.cursor()
+ cur = self.conn.cursor()
await ensure_table(cur, "id integer primary key, data text")
def records(self):
yield block
async def assert_data(self):
- cur = await self.conn.cursor()
+ cur = self.conn.cursor()
await cur.execute("select id, data from copy_in order by id")
for record in self.records():
assert record == await cur.fetchone()
async def test_close(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
assert not cur.closed
await cur.close()
assert cur.closed
async def test_context(aconn):
- async with (await aconn.cursor()) as cur:
+ async with aconn.cursor() as cur:
assert not cur.closed
assert cur.closed
async def test_weakref(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
w = weakref.ref(cur)
await cur.close()
del cur
async def test_status(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
assert cur.status is None
await cur.execute("reset all")
assert cur.status == cur.ExecStatus.COMMAND_OK
async def test_execute_many_results(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
assert cur.nextset() is None
rv = await cur.execute("select 'foo'; select generate_series(1,3)")
async def test_execute_sequence(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
rv = await cur.execute(
"select %s::int, %s::text, %s::text", [1, "foo", None]
)
@pytest.mark.parametrize("query", ["", " ", ";"])
async def test_execute_empty_query(aconn, query):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute(query)
assert cur.status == cur.ExecStatus.EMPTY_QUERY
with pytest.raises(psycopg3.ProgrammingError):
"query", ["copy testcopy from stdin", "copy testcopy to stdout"]
)
async def test_execute_copy(aconn, query):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("create table testcopy (id int)")
with pytest.raises(psycopg3.ProgrammingError):
await cur.execute(query)
async def test_fetchone(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("select %s::int, %s::text, %s::text", [1, "foo", None])
assert cur.pgresult.fformat(0) == 0
async def test_execute_binary_result(aconn):
- cur = await aconn.cursor(binary=True)
+ cur = aconn.cursor(binary=True)
await cur.execute("select %s::text, %s::text", ["foo", None])
assert cur.pgresult.fformat(0) == 1
@pytest.mark.parametrize("encoding", ["utf8", "latin9"])
async def test_query_encode(aconn, encoding):
await aconn.set_client_encoding(encoding)
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("select '\u20ac'")
(res,) = await cur.fetchone()
assert res == "\u20ac"
async def test_query_badenc(aconn):
await aconn.set_client_encoding("latin1")
- cur = await aconn.cursor()
+ cur = aconn.cursor()
with pytest.raises(UnicodeEncodeError):
await cur.execute("select '\u20ac'")
async def test_executemany(aconn, execmany):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.executemany(
"insert into execmany(num, data) values (%s, %s)",
[(10, "hello"), (20, "world")],
async def test_executemany_name(aconn, execmany):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.executemany(
"insert into execmany(num, data) values (%(num)s, %(data)s)",
[{"num": 11, "data": "hello", "x": 1}, {"num": 21, "data": "world"}],
async def test_executemany_rowcount(aconn, execmany):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.executemany(
"insert into execmany(num, data) values (%s, %s)",
[(10, "hello"), (20, "world")],
async def test_executemany_returning_rowcount(aconn, execmany):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.executemany(
"insert into execmany(num, data) values (%s, %s) returning num",
[(10, "hello"), (20, "world")],
],
)
async def test_executemany_badquery(aconn, query):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
with pytest.raises(psycopg3.DatabaseError):
await cur.executemany(query, [(10, "hello"), (20, "world")])
@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
async def test_executemany_null_first(aconn, fmt_in):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("create table testmany (a bigint, b bigint)")
await cur.executemany(
f"insert into testmany values (%{fmt_in}, %{fmt_in})",
async def test_rowcount(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("select 1 from generate_series(1, 0)")
assert cur.rowcount == 0
async def test_rownumber(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
assert cur.rownumber is None
await cur.execute("select 1 from generate_series(1, 42)")
async def test_iter(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("select generate_series(1, 3)")
res = []
async for rec in cur:
async def test_iter_stop(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("select generate_series(1, 3)")
async for rec in cur:
assert rec == (1,)
async def test_scroll(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
with pytest.raises(psycopg3.ProgrammingError):
await cur.scroll(0)
async def test_query_params_execute(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
assert cur.query is None
assert cur.params is None
async def test_query_params_executemany(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.executemany("select %t, %t", [[1, 2], [3, 4]])
assert cur.query == b"select $1, $2"
async def test_stream(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
recs = []
async for rec in cur.stream(
"select i, '2021-01-01'::date + i from generate_series(1, %s) as i",
async def test_stream_sql(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
recs = []
async for rec in cur.stream(
sql.SQL(
],
)
async def test_stream_badquery(aconn, query):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
with pytest.raises(psycopg3.ProgrammingError):
async for rec in cur.stream(query):
pass
async def test_str(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
assert "[IDLE]" in str(cur)
assert "[closed]" not in str(cur)
assert "[no result]" in str(cur)
n = []
for i in range(3):
async with await psycopg3.AsyncConnection.connect(dsn) as conn:
- async with await conn.cursor(binary=Format.as_pq(fmt)) as cur:
+ async with conn.cursor(binary=Format.as_pq(fmt)) as cur:
await cur.execute(faker.drop_stmt)
await cur.execute(faker.create_stmt)
await cur.executemany(faker.insert_stmt, faker.records)
@pytest.mark.asyncio
async def test_diag_from_commit_async(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute(
"""
create temp table test_deferred (
async def test_funny_name(aconn):
- cur = await aconn.cursor("1-2-3")
+ cur = aconn.cursor("1-2-3")
await cur.execute("select generate_series(1, 3) as bar")
assert await cur.fetchall() == [(1,), (2,), (3,)]
assert cur.name == "1-2-3"
async def test_description(aconn):
- cur = await aconn.cursor("foo")
+ cur = aconn.cursor("foo")
assert cur.name == "foo"
await cur.execute("select generate_series(1, 10) as bar")
assert len(cur.description) == 1
async def test_close(aconn, recwarn):
- cur = await aconn.cursor("foo")
+ cur = aconn.cursor("foo")
await cur.execute("select generate_series(1, 10) as bar")
await cur.close()
assert cur.closed
async def test_close_noop(aconn, recwarn):
- cur = await aconn.cursor("foo")
+ cur = aconn.cursor("foo")
await cur.close()
assert not recwarn
async def test_context(aconn, recwarn):
- async with await aconn.cursor("foo") as cur:
+ async with aconn.cursor("foo") as cur:
await cur.execute("select generate_series(1, 10) as bar")
assert cur.closed
async def test_warn_close(aconn, recwarn):
- cur = await aconn.cursor("foo")
+ cur = aconn.cursor("foo")
await cur.execute("select generate_series(1, 10) as bar")
del cur
assert ".close()" in str(recwarn.pop(ResourceWarning).message)
async def test_fetchone(aconn):
- async with await aconn.cursor("foo") as cur:
+ async with aconn.cursor("foo") as cur:
await cur.execute("select generate_series(1, %s) as bar", (2,))
assert await cur.fetchone() == (1,)
assert await cur.fetchone() == (2,)
async def test_fetchmany(aconn):
- async with await aconn.cursor("foo") as cur:
+ async with aconn.cursor("foo") as cur:
await cur.execute("select generate_series(1, %s) as bar", (5,))
assert await cur.fetchmany(3) == [(1,), (2,), (3,)]
assert await cur.fetchone() == (4,)
async def test_fetchall(aconn):
- async with await aconn.cursor("foo") as cur:
+ async with aconn.cursor("foo") as cur:
await cur.execute("select generate_series(1, %s) as bar", (3,))
assert await cur.fetchall() == [(1,), (2,), (3,)]
assert await cur.fetchall() == []
- async with await aconn.cursor("foo") as cur:
+ async with aconn.cursor("foo") as cur:
await cur.execute("select generate_series(1, %s) as bar", (3,))
assert await cur.fetchone() == (1,)
assert await cur.fetchall() == [(2,), (3,)]
async def test_rownumber(aconn):
- cur = await aconn.cursor("foo")
+ cur = aconn.cursor("foo")
assert cur.rownumber is None
await cur.execute("select 1 from generate_series(1, 42)")
async def test_iter(aconn):
- async with await aconn.cursor("foo") as cur:
+ async with aconn.cursor("foo") as cur:
await cur.execute("select generate_series(1, %s) as bar", (3,))
recs = []
async for rec in cur:
recs.append(rec)
assert recs == [(1,), (2,), (3,)]
- async with await aconn.cursor("foo") as cur:
+ async with aconn.cursor("foo") as cur:
await cur.execute("select generate_series(1, %s) as bar", (3,))
assert await cur.fetchone() == (1,)
recs = []
async def test_iter_rownumber(aconn):
- async with await aconn.cursor("foo") as cur:
+ async with aconn.cursor("foo") as cur:
await cur.execute("select generate_series(1, %s) as bar", (3,))
async for row in cur:
assert cur.rownumber == row[0]
async def test_itersize(aconn, acommands):
- async with await aconn.cursor("foo") as cur:
+ async with aconn.cursor("foo") as cur:
assert cur.itersize == 100
cur.itersize = 2
await cur.execute("select generate_series(1, %s) as bar", (3,))
async def test_scroll(aconn):
- cur = await aconn.cursor("tmp")
+ cur = aconn.cursor("tmp")
with pytest.raises(aconn.ProgrammingError):
await cur.scroll(0)
async def test_scrollable(aconn):
- curs = await aconn.cursor("foo")
+ curs = aconn.cursor("foo")
await curs.execute("select generate_series(0, 5)")
await curs.scroll(5)
for i in range(4, -1, -1):
async def test_non_scrollable(aconn):
- curs = await aconn.cursor("foo")
+ curs = aconn.cursor("foo")
await curs.execute("select generate_series(0, 5)", scrollable=False)
await curs.scroll(5)
with pytest.raises(aconn.OperationalError):
async def test_steal_cursor(aconn):
- cur1 = await aconn.cursor()
+ cur1 = aconn.cursor()
await cur1.execute(
"declare test cursor without hold for select generate_series(1, 6)"
)
- cur2 = await aconn.cursor("test")
+ cur2 = aconn.cursor("test")
# can call fetch without execute
assert await cur2.fetchone() == (1,)
assert await cur2.fetchmany(3) == [(2,), (3,), (4,)]
async def test_dont_prepare(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
for i in range(10):
await cur.execute("select %s::int", [i], prepare=False)
async def test_do_prepare(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
await cur.execute("select %s::int", [10], prepare=True)
await cur.execute("select count(*) from pg_prepared_statements")
assert await cur.fetchone() == (1,)
async def test_auto_prepare(aconn):
- cur = await aconn.cursor()
+ cur = aconn.cursor()
res = []
for i in range(10):
await cur.execute("select count(*) from pg_prepared_statements")
else:
async def f():
- cur = await conn.cursor()
+ cur = conn.cursor()
await cur.execute(sql, (value,))
return f()
else:
async def f():
- cur = await conn.cursor()
+ cur = conn.cursor()
await cur.execute(sql)
rows = await cur.fetchall()
return set(v for (v,) in rows)
assert commands.popall() == ["commit"]
# Case 1 (with a transaction already started)
- await (await aconn.cursor()).execute("select 1")
+ await aconn.cursor().execute("select 1")
assert commands.popall() == ["begin"]
async with aconn.transaction() as tx:
assert commands.popall() == ['savepoint "_pg3_1"']