loop.run_until_complete(
psycopg3.AsyncConnection.connect(*args, **kwargs)
)
+
+
+def test_broken_connection(aconn, loop):
+ cur = aconn.cursor()
+ with pytest.raises(psycopg3.DatabaseError):
+ loop.run_until_complete(
+ cur.execute("select pg_terminate_backend(pg_backend_pid())")
+ )
+ assert aconn.closed
monkeypatch.setattr(psycopg3.connection, "connect", fake_connect)
with pytest.raises((TypeError, psycopg3.ProgrammingError)):
psycopg3.Connection.connect(*args, **kwargs)
+
+
+def test_broken_connection(conn):
+ cur = conn.cursor()
+ with pytest.raises(psycopg3.DatabaseError):
+ cur.execute("select pg_terminate_backend(pg_backend_pid())")
+ assert conn.closed