import time
import queue
import pytest
-import selectors
import threading
import subprocess as sp
from typing import List
@pytest.mark.slow
def test_identify_closure(dsn, retries):
def closer():
- time.sleep(0.3)
+ time.sleep(0.2)
conn2.execute(
"select pg_terminate_backend(%s)", [conn.pgconn.backend_pid]
)
conn = psycopg.connect(dsn)
conn2 = psycopg.connect(dsn)
try:
+ t = threading.Thread(target=closer)
+ t.start()
t0 = time.time()
- with selectors.DefaultSelector() as sel:
- sel.register(conn, selectors.EVENT_READ)
- t = threading.Thread(target=closer)
- t.start()
- try:
- assert sel.select(timeout=1.0)
- with pytest.raises(psycopg.OperationalError):
- conn.execute("select 1")
- t1 = time.time()
- assert 0.3 < t1 - t0 < 0.6
- finally:
- t.join()
+ try:
+ with pytest.raises(psycopg.OperationalError):
+ conn.execute("select pg_sleep(1.0)")
+ t1 = time.time()
+ assert 0.2 < t1 - t0 < 0.4
+ finally:
+ t.join()
finally:
conn.close()
conn2.close()
@pytest.mark.slow
async def test_identify_closure(dsn, retries):
async def closer():
- await asyncio.sleep(0.3)
+ await asyncio.sleep(0.2)
await conn2.execute(
"select pg_terminate_backend(%s)", [aconn.pgconn.backend_pid]
)
aconn = await psycopg.AsyncConnection.connect(dsn)
conn2 = await psycopg.AsyncConnection.connect(dsn)
try:
- t0 = time.time()
- ev = asyncio.Event()
- loop = asyncio.get_event_loop()
- loop.add_reader(aconn.fileno(), ev.set)
t = create_task(closer())
+ t0 = time.time()
try:
-
- await asyncio.wait_for(ev.wait(), 1.0)
with pytest.raises(psycopg.OperationalError):
- await aconn.execute("select 1")
+ await aconn.execute("select pg_sleep(1.0)")
t1 = time.time()
- assert 0.3 < t1 - t0 < 0.6
+ assert 0.2 < t1 - t0 < 0.4
finally:
- await asyncio.gather(*t)
+ await asyncio.gather(t)
finally:
await aconn.close()
await conn2.close()