assert n.payload == "2"
assert t1 - t0 == pytest.approx(0.5, abs=0.05)
+ t.join()
+
@pytest.mark.slow
def test_cancel(conn, retries):
conn.rollback()
assert cur.execute("select 1").fetchone()[0] == 1
+ t.join()
+
@pytest.mark.slow
def test_identify_closure(dsn, retries):
with retry:
conn = psycopg.connect(dsn)
conn2 = psycopg.connect(dsn)
-
- t0 = time.time()
- sel = selectors.DefaultSelector()
- sel.register(conn, selectors.EVENT_READ)
- t = threading.Thread(target=closer)
- t.start()
-
- assert sel.select(timeout=1.0)
- with pytest.raises(psycopg.OperationalError):
- conn.execute("select 1")
- t1 = time.time()
- assert 0.3 < t1 - t0 < 0.6
-
- conn.close()
- conn2.close()
+ try:
+ t0 = time.time()
+ sel = selectors.DefaultSelector()
+ sel.register(conn, selectors.EVENT_READ)
+ t = threading.Thread(target=closer)
+ t.start()
+ try:
+ assert sel.select(timeout=1.0)
+ with pytest.raises(psycopg.OperationalError):
+ conn.execute("select 1")
+ t1 = time.time()
+ assert 0.3 < t1 - t0 < 0.6
+ finally:
+ t.join()
+ finally:
+ conn.close()
+ conn2.close()
with retry:
aconn = await psycopg.AsyncConnection.connect(dsn)
conn2 = await psycopg.AsyncConnection.connect(dsn)
-
- t0 = time.time()
- ev = asyncio.Event()
- loop = asyncio.get_event_loop()
- loop.add_reader(aconn.fileno(), ev.set)
- create_task(closer())
-
- await asyncio.wait_for(ev.wait(), 1.0)
- with pytest.raises(psycopg.OperationalError):
- await aconn.execute("select 1")
- t1 = time.time()
- assert 0.3 < t1 - t0 < 0.6
-
- await aconn.close()
- await conn2.close()
+ try:
+ t0 = time.time()
+ ev = asyncio.Event()
+ loop = asyncio.get_event_loop()
+ loop.add_reader(aconn.fileno(), ev.set)
+ t = create_task(closer())
+ try:
+
+ await asyncio.wait_for(ev.wait(), 1.0)
+ with pytest.raises(psycopg.OperationalError):
+ await aconn.execute("select 1")
+ t1 = time.time()
+ assert 0.3 < t1 - t0 < 0.6
+ finally:
+ await asyncio.gather(*t)
+ finally:
+ await aconn.close()
+ await conn2.close()