from psycopg3 import AsyncConnection
from psycopg3.conninfo import conninfo_to_dict
+pytestmark = pytest.mark.asyncio
-def test_connect(dsn, loop):
- conn = loop.run_until_complete(AsyncConnection.connect(dsn))
+
+async def test_connect(dsn):
+ conn = await AsyncConnection.connect(dsn)
assert conn.status == conn.ConnStatus.OK
-def test_connect_bad(loop):
+@pytest.mark.asyncio
+async def test_connect_bad():
with pytest.raises(psycopg3.OperationalError):
- loop.run_until_complete(AsyncConnection.connect("dbname=nosuchdb"))
+ await AsyncConnection.connect("dbname=nosuchdb")
-def test_close(aconn, loop):
+async def test_close(aconn):
assert not aconn.closed
- loop.run_until_complete(aconn.close())
+ await aconn.close()
assert aconn.closed
assert aconn.status == aconn.ConnStatus.BAD
- loop.run_until_complete(aconn.close())
+ await aconn.close()
assert aconn.closed
assert aconn.status == aconn.ConnStatus.BAD
-def test_weakref(dsn, loop):
- conn = loop.run_until_complete(psycopg3.AsyncConnection.connect(dsn))
+async def test_weakref(dsn):
+ conn = await psycopg3.AsyncConnection.connect(dsn)
w = weakref.ref(conn)
- loop.run_until_complete(conn.close())
+ await conn.close()
del conn
gc.collect()
assert w() is None
-def test_commit(loop, aconn):
+async def test_commit(aconn):
aconn.pgconn.exec_(b"drop table if exists foo")
aconn.pgconn.exec_(b"create table foo (id int primary key)")
aconn.pgconn.exec_(b"begin")
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
aconn.pgconn.exec_(b"insert into foo values (1)")
- loop.run_until_complete(aconn.commit())
+ await aconn.commit()
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
res = aconn.pgconn.exec_(b"select id from foo where id = 1")
assert res.get_value(0, 0) == b"1"
- loop.run_until_complete(aconn.close())
+ await aconn.close()
with pytest.raises(psycopg3.OperationalError):
- loop.run_until_complete(aconn.commit())
+ await aconn.commit()
-def test_rollback(loop, aconn):
+async def test_rollback(aconn):
aconn.pgconn.exec_(b"drop table if exists foo")
aconn.pgconn.exec_(b"create table foo (id int primary key)")
aconn.pgconn.exec_(b"begin")
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
aconn.pgconn.exec_(b"insert into foo values (1)")
- loop.run_until_complete(aconn.rollback())
+ await aconn.rollback()
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
res = aconn.pgconn.exec_(b"select id from foo where id = 1")
assert res.ntuples == 0
- loop.run_until_complete(aconn.close())
+ await aconn.close()
with pytest.raises(psycopg3.OperationalError):
- loop.run_until_complete(aconn.rollback())
+ await aconn.rollback()
-def test_auto_transaction(loop, aconn):
+async def test_auto_transaction(aconn):
aconn.pgconn.exec_(b"drop table if exists foo")
aconn.pgconn.exec_(b"create table foo (id int primary key)")
cur = aconn.cursor()
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
- loop.run_until_complete(cur.execute("insert into foo values (1)"))
+ await cur.execute("insert into foo values (1)")
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
- loop.run_until_complete(aconn.commit())
+ await aconn.commit()
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
- loop.run_until_complete(cur.execute("select * from foo"))
- assert loop.run_until_complete(cur.fetchone()) == (1,)
+ await cur.execute("select * from foo")
+ assert await cur.fetchone() == (1,)
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
-def test_auto_transaction_fail(loop, aconn):
+async def test_auto_transaction_fail(aconn):
aconn.pgconn.exec_(b"drop table if exists foo")
aconn.pgconn.exec_(b"create table foo (id int primary key)")
cur = aconn.cursor()
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
- loop.run_until_complete(cur.execute("insert into foo values (1)"))
+ await cur.execute("insert into foo values (1)")
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
with pytest.raises(psycopg3.DatabaseError):
- loop.run_until_complete(cur.execute("meh"))
+ await cur.execute("meh")
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INERROR
- loop.run_until_complete(aconn.commit())
+ await aconn.commit()
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
- loop.run_until_complete(cur.execute("select * from foo"))
- assert loop.run_until_complete(cur.fetchone()) is None
+ await cur.execute("select * from foo")
+ assert await cur.fetchone() is None
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
-def test_autocommit(loop, aconn):
+async def test_autocommit(aconn):
assert aconn.autocommit is False
aconn.autocommit = True
assert aconn.autocommit
cur = aconn.cursor()
- loop.run_until_complete(cur.execute("select 1"))
- assert loop.run_until_complete(cur.fetchone()) == (1,)
+ await cur.execute("select 1")
+ assert await cur.fetchone() == (1,)
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
-def test_autocommit_connect(loop, dsn):
- aconn = loop.run_until_complete(
- psycopg3.AsyncConnection.connect(dsn, autocommit=True)
- )
+async def test_autocommit_connect(dsn):
+ aconn = await psycopg3.AsyncConnection.connect(dsn, autocommit=True)
assert aconn.autocommit
-def test_autocommit_intrans(loop, aconn):
+async def test_autocommit_intrans(aconn):
cur = aconn.cursor()
- loop.run_until_complete(cur.execute("select 1"))
- assert loop.run_until_complete(cur.fetchone()) == (1,)
+ await cur.execute("select 1")
+ assert await cur.fetchone() == (1,)
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
with pytest.raises(psycopg3.ProgrammingError):
aconn.autocommit = True
assert not aconn.autocommit
-def test_autocommit_inerror(loop, aconn):
+async def test_autocommit_inerror(aconn):
cur = aconn.cursor()
with pytest.raises(psycopg3.DatabaseError):
- loop.run_until_complete(cur.execute("meh"))
+ await cur.execute("meh")
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INERROR
with pytest.raises(psycopg3.ProgrammingError):
aconn.autocommit = True
assert not aconn.autocommit
-def test_autocommit_unknown(loop, aconn):
- loop.run_until_complete(aconn.close())
+async def test_autocommit_unknown(aconn):
+ await aconn.close()
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.UNKNOWN
with pytest.raises(psycopg3.ProgrammingError):
aconn.autocommit = True
assert not aconn.autocommit
-def test_get_encoding(aconn, loop):
+async def test_get_encoding(aconn):
cur = aconn.cursor()
- loop.run_until_complete(cur.execute("show client_encoding"))
- (enc,) = loop.run_until_complete(cur.fetchone())
+ await cur.execute("show client_encoding")
+ (enc,) = await cur.fetchone()
assert enc == aconn.encoding
-def test_set_encoding(aconn, loop):
+async def test_set_encoding(aconn):
newenc = "LATIN1" if aconn.encoding != "LATIN1" else "UTF8"
assert aconn.encoding != newenc
- loop.run_until_complete(aconn.set_client_encoding(newenc))
+ await aconn.set_client_encoding(newenc)
assert aconn.encoding == newenc
cur = aconn.cursor()
- loop.run_until_complete(cur.execute("show client_encoding"))
- (enc,) = loop.run_until_complete(cur.fetchone())
+ await cur.execute("show client_encoding")
+ (enc,) = await cur.fetchone()
assert enc == newenc
("euc-jp", "EUC_JP", "euc_jp"),
],
)
-def test_normalize_encoding(aconn, loop, enc, out, codec):
- loop.run_until_complete(aconn.set_client_encoding(enc))
+async def test_normalize_encoding(aconn, enc, out, codec):
+ await aconn.set_client_encoding(enc)
assert aconn.encoding == out
assert aconn.codec.name == codec
("euc-jp", "EUC_JP", "euc_jp"),
],
)
-def test_encoding_env_var(dsn, loop, monkeypatch, enc, out, codec):
+async def test_encoding_env_var(dsn, monkeypatch, enc, out, codec):
monkeypatch.setenv("PGCLIENTENCODING", enc)
- aconn = loop.run_until_complete(psycopg3.AsyncConnection.connect(dsn))
+ aconn = await psycopg3.AsyncConnection.connect(dsn)
assert aconn.encoding == out
assert aconn.codec.name == codec
-def test_set_encoding_unsupported(aconn, loop):
- loop.run_until_complete(aconn.set_client_encoding("EUC_TW"))
+async def test_set_encoding_unsupported(aconn):
+ await aconn.set_client_encoding("EUC_TW")
with pytest.raises(psycopg3.NotSupportedError):
- loop.run_until_complete(aconn.cursor().execute("select 1"))
+ await aconn.cursor().execute("select 1")
-def test_set_encoding_bad(aconn, loop):
+async def test_set_encoding_bad(aconn):
with pytest.raises(psycopg3.DatabaseError):
- loop.run_until_complete(aconn.set_client_encoding("WAT"))
+ await aconn.set_client_encoding("WAT")
@pytest.mark.parametrize(
("host=foo", {"user": None}, "host=foo"),
],
)
-def test_connect_args(monkeypatch, pgconn, loop, testdsn, kwargs, want):
+async def test_connect_args(monkeypatch, pgconn, testdsn, kwargs, want):
the_conninfo = None
def fake_connect(conninfo):
yield
monkeypatch.setattr(psycopg3.connection, "connect", fake_connect)
- loop.run_until_complete(
- psycopg3.AsyncConnection.connect(testdsn, **kwargs)
- )
+ await psycopg3.AsyncConnection.connect(testdsn, **kwargs)
assert conninfo_to_dict(the_conninfo) == conninfo_to_dict(want)
@pytest.mark.parametrize(
"args, kwargs", [((), {}), (("", ""), {}), ((), {"nosuchparam": 42})],
)
-def test_connect_badargs(monkeypatch, pgconn, loop, args, kwargs):
+async def test_connect_badargs(monkeypatch, pgconn, args, kwargs):
def fake_connect(conninfo):
return pgconn
yield
monkeypatch.setattr(psycopg3.connection, "connect", fake_connect)
with pytest.raises((TypeError, psycopg3.ProgrammingError)):
- loop.run_until_complete(
- psycopg3.AsyncConnection.connect(*args, **kwargs)
- )
+ await psycopg3.AsyncConnection.connect(*args, **kwargs)
-def test_broken_connection(aconn, loop):
+async def test_broken_connection(aconn):
cur = aconn.cursor()
with pytest.raises(psycopg3.DatabaseError):
- loop.run_until_complete(
- cur.execute("select pg_terminate_backend(pg_backend_pid())")
- )
+ await cur.execute("select pg_terminate_backend(pg_backend_pid())")
assert aconn.closed
-def test_notice_handlers(aconn, loop, caplog):
+async def test_notice_handlers(aconn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg3")
messages = []
severities = []
aconn.pgconn.exec_(b"set client_min_messages to notice")
cur = aconn.cursor()
- loop.run_until_complete(
- cur.execute(
- "do $$begin raise notice 'hello notice'; end$$ language plpgsql"
- )
+ await cur.execute(
+ "do $$begin raise notice 'hello notice'; end$$ language plpgsql"
)
assert messages == ["hello notice"]
assert severities == ["NOTICE"]
aconn.remove_notice_handler(cb1)
aconn.remove_notice_handler("the wrong thing")
- loop.run_until_complete(
- cur.execute(
- "do $$begin raise warning 'hello warning'; end$$ language plpgsql"
- )
+ await cur.execute(
+ "do $$begin raise warning 'hello warning'; end$$ language plpgsql"
)
assert len(caplog.records) == 3
assert messages == ["hello notice"]
import psycopg3
+pytestmark = pytest.mark.asyncio
-def test_close(aconn, loop):
+
+async def test_close(aconn):
cur = aconn.cursor()
assert not cur.closed
- loop.run_until_complete(cur.close())
+ await cur.close()
assert cur.closed
with pytest.raises(psycopg3.OperationalError):
- loop.run_until_complete(cur.execute("select 'foo'"))
+ await cur.execute("select 'foo'")
- loop.run_until_complete(cur.close())
+ await cur.close()
assert cur.closed
-def test_weakref(aconn, loop):
+async def test_weakref(aconn):
cur = aconn.cursor()
w = weakref.ref(cur)
- loop.run_until_complete(cur.close())
+ await cur.close()
del cur
gc.collect()
assert w() is None
-def test_status(aconn, loop):
+async def test_status(aconn):
cur = aconn.cursor()
assert cur.status is None
- loop.run_until_complete(cur.execute("reset all"))
+ await cur.execute("reset all")
assert cur.status == cur.ExecStatus.COMMAND_OK
- loop.run_until_complete(cur.execute("select 1"))
+ await cur.execute("select 1")
assert cur.status == cur.ExecStatus.TUPLES_OK
- loop.run_until_complete(cur.close())
+ await cur.close()
assert cur.status is None
-def test_execute_many_results(aconn, loop):
+async def test_execute_many_results(aconn):
cur = aconn.cursor()
assert cur.nextset() is None
- rv = loop.run_until_complete(cur.execute("select 'foo'; select 'bar'"))
+ rv = await cur.execute("select 'foo'; select 'bar'")
assert rv is cur
assert len(cur._results) == 2
assert cur.pgresult.get_value(0, 0) == b"foo"
assert cur.pgresult.get_value(0, 0) == b"bar"
assert cur.nextset() is None
- loop.run_until_complete(cur.close())
+ await cur.close()
assert cur.nextset() is None
-def test_execute_sequence(aconn, loop):
+async def test_execute_sequence(aconn):
cur = aconn.cursor()
- rv = loop.run_until_complete(
- cur.execute("select %s, %s, %s", [1, "foo", None])
- )
+ rv = await cur.execute("select %s, %s, %s", [1, "foo", None])
assert rv is cur
assert len(cur._results) == 1
assert cur.pgresult.get_value(0, 0) == b"1"
@pytest.mark.parametrize("query", ["", " ", ";"])
-def test_execute_empty_query(aconn, loop, query):
+async def test_execute_empty_query(aconn, query):
cur = aconn.cursor()
- loop.run_until_complete(cur.execute(query))
+ await cur.execute(query)
assert cur.status == cur.ExecStatus.EMPTY_QUERY
with pytest.raises(psycopg3.ProgrammingError):
- loop.run_until_complete(cur.fetchone())
+ await cur.fetchone()
-def test_fetchone(aconn, loop):
+async def test_fetchone(aconn):
cur = aconn.cursor()
- loop.run_until_complete(cur.execute("select %s, %s, %s", [1, "foo", None]))
+ await cur.execute("select %s, %s, %s", [1, "foo", None])
assert cur.pgresult.fformat(0) == 0
- row = loop.run_until_complete(cur.fetchone())
+ row = await cur.fetchone()
assert row[0] == 1
assert row[1] == "foo"
assert row[2] is None
- row = loop.run_until_complete(cur.fetchone())
+ row = await cur.fetchone()
assert row is None
-def test_execute_binary_result(aconn, loop):
+async def test_execute_binary_result(aconn):
cur = aconn.cursor(binary=True)
- loop.run_until_complete(cur.execute("select %s, %s", ["foo", None]))
+ await cur.execute("select %s, %s", ["foo", None])
assert cur.pgresult.fformat(0) == 1
- row = loop.run_until_complete(cur.fetchone())
+ row = await cur.fetchone()
assert row[0] == "foo"
assert row[1] is None
- row = loop.run_until_complete(cur.fetchone())
+ row = await cur.fetchone()
assert row is None
@pytest.mark.parametrize("encoding", ["utf8", "latin9"])
-def test_query_encode(aconn, loop, encoding):
- loop.run_until_complete(aconn.set_client_encoding(encoding))
+async def test_query_encode(aconn, encoding):
+ await aconn.set_client_encoding(encoding)
cur = aconn.cursor()
- loop.run_until_complete(cur.execute("select '\u20ac'"))
- (res,) = loop.run_until_complete(cur.fetchone())
+ await cur.execute("select '\u20ac'")
+ (res,) = await cur.fetchone()
assert res == "\u20ac"
-def test_query_badenc(aconn, loop):
- loop.run_until_complete(aconn.set_client_encoding("latin1"))
+async def test_query_badenc(aconn):
+ await aconn.set_client_encoding("latin1")
cur = aconn.cursor()
with pytest.raises(UnicodeEncodeError):
- loop.run_until_complete(cur.execute("select '\u20ac'"))
+ await cur.execute("select '\u20ac'")
@pytest.fixture(scope="session")
-def _execmany(svcconn):
+async def _execmany(svcconn):
cur = svcconn.cursor()
cur.execute(
"""
@pytest.fixture(scope="function")
-def execmany(svcconn, _execmany):
+async def execmany(svcconn, _execmany):
cur = svcconn.cursor()
cur.execute("truncate table execmany")
svcconn.commit()
-def test_executemany(aconn, loop, execmany):
+async def test_executemany(aconn, execmany):
cur = aconn.cursor()
- loop.run_until_complete(
- cur.executemany(
- "insert into execmany(num, data) values (%s, %s)",
- [(10, "hello"), (20, "world")],
- )
+ await cur.executemany(
+ "insert into execmany(num, data) values (%s, %s)",
+ [(10, "hello"), (20, "world")],
)
- loop.run_until_complete(
- cur.execute("select num, data from execmany order by 1")
- )
- rv = loop.run_until_complete(cur.fetchall())
+ await cur.execute("select num, data from execmany order by 1")
+ rv = await cur.fetchall()
assert rv == [(10, "hello"), (20, "world")]
-def test_executemany_name(aconn, loop, execmany):
+async def test_executemany_name(aconn, execmany):
cur = aconn.cursor()
- loop.run_until_complete(
- cur.executemany(
- "insert into execmany(num, data) values (%(num)s, %(data)s)",
- [
- {"num": 11, "data": "hello", "x": 1},
- {"num": 21, "data": "world"},
- ],
- )
- )
- loop.run_until_complete(
- cur.execute("select num, data from execmany order by 1")
+ await cur.executemany(
+ "insert into execmany(num, data) values (%(num)s, %(data)s)",
+ [{"num": 11, "data": "hello", "x": 1}, {"num": 21, "data": "world"}],
)
- rv = loop.run_until_complete(cur.fetchall())
+ await cur.execute("select num, data from execmany order by 1")
+ rv = await cur.fetchall()
assert rv == [(11, "hello"), (21, "world")]
@pytest.mark.xfail
-def test_executemany_rowcount(aconn, loop, execmany):
+async def test_executemany_rowcount(aconn, execmany):
cur = aconn.cursor()
- loop.run_until_complete(
- cur.executemany(
- "insert into execmany(num, data) values (%s, %s)",
- [(10, "hello"), (20, "world")],
- )
+ await cur.executemany(
+ "insert into execmany(num, data) values (%s, %s)",
+ [(10, "hello"), (20, "world")],
)
assert cur.rowcount == 2
"wat (%s, %s)",
],
)
-def test_executemany_badquery(aconn, loop, query):
+async def test_executemany_badquery(aconn, query):
cur = aconn.cursor()
with pytest.raises(psycopg3.DatabaseError):
- loop.run_until_complete(
- cur.executemany(query, [(10, "hello"), (20, "world")])
- )
+ await cur.executemany(query, [(10, "hello"), (20, "world")])