@pytest.mark.slow
-def test_closed_queue(dsn):
- p = pool.ConnectionPool(dsn, min_size=1)
- success = []
-
+def test_closed_queue(dsn, retries):
def w1():
with p.connection() as conn:
assert (
pass
success.append("w2")
- t1 = Thread(target=w1)
- t2 = Thread(target=w2)
- t1.start()
- sleep(0.1)
- t2.start()
- p.close()
- t1.join()
- t2.join()
- assert len(success) == 2
+ for retry in retries:
+ with retry:
+ p = pool.ConnectionPool(dsn, min_size=1)
+ success = []
+
+ t1 = Thread(target=w1)
+ t2 = Thread(target=w2)
+ t1.start()
+ sleep(0.1)
+ t2.start()
+ p.close()
+ t1.join()
+ t2.join()
+ assert len(success) == 2
@pytest.mark.slow
@pytest.mark.slow
-def test_reconnect(proxy, caplog, monkeypatch):
+def test_reconnect(proxy, caplog, monkeypatch, retries):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
- proxy.start()
- with pool.ConnectionPool(proxy.client_dsn, min_size=1) as p:
- p.wait(2.0)
- proxy.stop()
-
- with pytest.raises(psycopg.OperationalError):
- with p.connection() as conn:
- conn.execute("select 1")
-
- sleep(1.0)
- proxy.start()
- p.wait()
-
- with p.connection() as conn:
- conn.execute("select 1")
+ for retry in retries:
+ with retry:
+ proxy.start()
+ with pool.ConnectionPool(proxy.client_dsn, min_size=1) as p:
+ p.wait(2.0)
+ proxy.stop()
- assert "BAD" in caplog.messages[0]
- times = [rec.created for rec in caplog.records]
- assert times[1] - times[0] < 0.05
- deltas = [times[i + 1] - times[i] for i in range(1, len(times) - 1)]
- assert len(deltas) == 3
- want = 0.1
- for delta in deltas:
- assert delta == pytest.approx(want, 0.05), deltas
- want *= 2
+ with pytest.raises(psycopg.OperationalError):
+ with p.connection() as conn:
+ conn.execute("select 1")
+
+ sleep(1.0)
+ proxy.start()
+ p.wait()
+
+ with p.connection() as conn:
+ conn.execute("select 1")
+
+ assert "BAD" in caplog.messages[0]
+ times = [rec.created for rec in caplog.records]
+ assert times[1] - times[0] < 0.05
+ deltas = [
+ times[i + 1] - times[i] for i in range(1, len(times) - 1)
+ ]
+ assert len(deltas) == 3
+ want = 0.1
+ for delta in deltas:
+ assert delta == pytest.approx(want, 0.05), deltas
+ want *= 2
@pytest.mark.slow
@pytest.mark.slow
-async def test_closed_queue(dsn):
- p = pool.AsyncConnectionPool(dsn, min_size=1)
- success = []
-
+async def test_closed_queue(dsn, retries):
async def w1():
async with p.connection() as conn:
res = await conn.execute("select 1 from pg_sleep(0.2)")
pass
success.append("w2")
- t1 = create_task(w1())
- await asyncio.sleep(0.1)
- t2 = create_task(w2())
- await p.close()
- await asyncio.gather(t1, t2)
- assert len(success) == 2
+ async for retry in retries:
+ with retry:
+ p = pool.AsyncConnectionPool(dsn, min_size=1)
+ success = []
+
+ t1 = create_task(w1())
+ await asyncio.sleep(0.1)
+ t2 = create_task(w2())
+ await p.close()
+ await asyncio.gather(t1, t2)
+ assert len(success) == 2
@pytest.mark.slow
@pytest.mark.slow
-async def test_reconnect(proxy, caplog, monkeypatch):
+async def test_reconnect(proxy, caplog, monkeypatch, retries):
assert pool.base.ConnectionAttempt.INITIAL_DELAY == 1.0
assert pool.base.ConnectionAttempt.DELAY_JITTER == 0.1
monkeypatch.setattr(pool.base.ConnectionAttempt, "INITIAL_DELAY", 0.1)
monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
- proxy.start()
- async with pool.AsyncConnectionPool(proxy.client_dsn, min_size=1) as p:
- await p.wait(2.0)
- proxy.stop()
-
- with pytest.raises(psycopg.OperationalError):
- async with p.connection() as conn:
- await conn.execute("select 1")
-
- await asyncio.sleep(1.0)
- proxy.start()
- await p.wait()
-
- async with p.connection() as conn:
- await conn.execute("select 1")
+ async for retry in retries:
+ with retry:
+ proxy.start()
+ async with pool.AsyncConnectionPool(
+ proxy.client_dsn, min_size=1
+ ) as p:
+ await p.wait(2.0)
+ proxy.stop()
- recs = [
- r
- for r in caplog.records
- if r.name.startswith("psycopg") and r.levelno >= logging.WARNING
- ]
- assert "BAD" in recs[0].message
- times = [rec.created for rec in recs]
- assert times[1] - times[0] < 0.05
- deltas = [times[i + 1] - times[i] for i in range(1, len(times) - 1)]
- assert len(deltas) == 3
- want = 0.1
- for delta in deltas:
- assert delta == pytest.approx(want, 0.05), deltas
- want *= 2
+ with pytest.raises(psycopg.OperationalError):
+ async with p.connection() as conn:
+ await conn.execute("select 1")
+
+ await asyncio.sleep(1.0)
+ proxy.start()
+ await p.wait()
+
+ async with p.connection() as conn:
+ await conn.execute("select 1")
+
+ recs = [
+ r
+ for r in caplog.records
+ if r.name.startswith("psycopg")
+ and r.levelno >= logging.WARNING
+ ]
+ assert "BAD" in recs[0].message
+ times = [rec.created for rec in recs]
+ assert times[1] - times[0] < 0.05
+ deltas = [
+ times[i + 1] - times[i] for i in range(1, len(times) - 1)
+ ]
+ assert len(deltas) == 3
+ want = 0.1
+ for delta in deltas:
+ assert delta == pytest.approx(want, 0.05), deltas
+ want *= 2
@pytest.mark.slow
@pytest.mark.slow
-def test_identify_closure(conn, dsn):
- conn2 = psycopg.connect(dsn)
-
+def test_identify_closure(dsn, retries):
def closer():
time.sleep(0.3)
conn2.execute(
"select pg_terminate_backend(%s)", [conn.pgconn.backend_pid]
)
- t0 = time.time()
- sel = selectors.DefaultSelector()
- sel.register(conn, selectors.EVENT_READ)
- t = threading.Thread(target=closer)
- t.start()
-
- assert sel.select(timeout=1.0)
- with pytest.raises(psycopg.OperationalError):
- conn.execute("select 1")
- t1 = time.time()
- assert 0.3 < t1 - t0 < 0.6
+ for retry in retries:
+ with retry:
+ conn = psycopg.connect(dsn)
+ conn2 = psycopg.connect(dsn)
+
+ t0 = time.time()
+ sel = selectors.DefaultSelector()
+ sel.register(conn, selectors.EVENT_READ)
+ t = threading.Thread(target=closer)
+ t.start()
+
+ assert sel.select(timeout=1.0)
+ with pytest.raises(psycopg.OperationalError):
+ conn.execute("select 1")
+ t1 = time.time()
+ assert 0.3 < t1 - t0 < 0.6
@pytest.mark.slow
-async def test_identify_closure(aconn, dsn):
- conn2 = await psycopg.AsyncConnection.connect(dsn)
-
+async def test_identify_closure(dsn, retries):
async def closer():
await asyncio.sleep(0.3)
await conn2.execute(
"select pg_terminate_backend(%s)", [aconn.pgconn.backend_pid]
)
- t0 = time.time()
- ev = asyncio.Event()
- loop = asyncio.get_event_loop()
- loop.add_reader(aconn.fileno(), ev.set)
- create_task(closer())
-
- await asyncio.wait_for(ev.wait(), 1.0)
- with pytest.raises(psycopg.OperationalError):
- await aconn.execute("select 1")
- t1 = time.time()
- assert 0.3 < t1 - t0 < 0.6
+ async for retry in retries:
+ with retry:
+ aconn = await psycopg.AsyncConnection.connect(dsn)
+ conn2 = await psycopg.AsyncConnection.connect(dsn)
+
+ t0 = time.time()
+ ev = asyncio.Event()
+ loop = asyncio.get_event_loop()
+ loop.add_reader(aconn.fileno(), ev.set)
+ create_task(closer())
+
+ await asyncio.wait_for(ev.wait(), 1.0)
+ with pytest.raises(psycopg.OperationalError):
+ await aconn.execute("select 1")
+ t1 = time.time()
+ assert 0.3 < t1 - t0 < 0.6
@pytest.mark.parametrize(
"row_factory", ["tuple_row", "dict_row", "namedtuple_row"]
)
-def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory):
+def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory, retries):
faker.format = fmt
faker.choose_schema(ncols=5)
faker.make_records(10)
row_factory = getattr(rows, row_factory)
- n = []
- gc_collect()
- for i in range(3):
- with psycopg.connect(dsn) as conn:
- with conn.cursor(binary=fmt_out, row_factory=row_factory) as cur:
- cur.execute(faker.drop_stmt)
- cur.execute(faker.create_stmt)
- cur.executemany(faker.insert_stmt, faker.records)
- cur.execute(faker.select_stmt)
-
- if fetch == "one":
- while 1:
- tmp = cur.fetchone()
- if tmp is None:
- break
- elif fetch == "many":
- while 1:
- tmp = cur.fetchmany(3)
- if not tmp:
- break
- elif fetch == "all":
- cur.fetchall()
- elif fetch == "iter":
- for rec in cur:
- pass
-
- tmp = None
-
- del cur, conn
- gc_collect()
- n.append(len(gc.get_objects()))
-
- assert (
- n[0] == n[1] == n[2]
- ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+ for retry in retries:
+ with retry:
+ n = []
+ gc_collect()
+ for i in range(3):
+ with psycopg.connect(dsn) as conn:
+ with conn.cursor(
+ binary=fmt_out, row_factory=row_factory
+ ) as cur:
+ cur.execute(faker.drop_stmt)
+ cur.execute(faker.create_stmt)
+ cur.executemany(faker.insert_stmt, faker.records)
+ cur.execute(faker.select_stmt)
+
+ if fetch == "one":
+ while 1:
+ tmp = cur.fetchone()
+ if tmp is None:
+ break
+ elif fetch == "many":
+ while 1:
+ tmp = cur.fetchmany(3)
+ if not tmp:
+ break
+ elif fetch == "all":
+ cur.fetchall()
+ elif fetch == "iter":
+ for rec in cur:
+ pass
+
+ tmp = None
+
+ del cur, conn
+ gc_collect()
+ n.append(len(gc.get_objects()))
+
+ assert (
+ n[0] == n[1] == n[2]
+ ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
def my_row_factory(cursor):
@pytest.mark.parametrize(
"row_factory", ["tuple_row", "dict_row", "namedtuple_row"]
)
-async def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory):
+async def test_leak(dsn, faker, fmt, fmt_out, fetch, row_factory, retries):
faker.format = fmt
faker.choose_schema(ncols=5)
faker.make_records(10)
row_factory = getattr(rows, row_factory)
- n = []
- gc_collect()
- for i in range(3):
- async with await psycopg.AsyncConnection.connect(dsn) as conn:
- async with conn.cursor(
- binary=fmt_out, row_factory=row_factory
- ) as cur:
- await cur.execute(faker.drop_stmt)
- await cur.execute(faker.create_stmt)
- await cur.executemany(faker.insert_stmt, faker.records)
- await cur.execute(faker.select_stmt)
-
- if fetch == "one":
- while 1:
- tmp = await cur.fetchone()
- if tmp is None:
- break
- elif fetch == "many":
- while 1:
- tmp = await cur.fetchmany(3)
- if not tmp:
- break
- elif fetch == "all":
- await cur.fetchall()
- elif fetch == "iter":
- async for rec in cur:
- pass
-
- tmp = None
-
- del cur, conn
- gc_collect()
- n.append(len(gc.get_objects()))
-
- assert (
- n[0] == n[1] == n[2]
- ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+ async for retry in retries:
+ with retry:
+ n = []
+ gc_collect()
+ for i in range(3):
+ async with await psycopg.AsyncConnection.connect(dsn) as conn:
+ async with conn.cursor(
+ binary=fmt_out, row_factory=row_factory
+ ) as cur:
+ await cur.execute(faker.drop_stmt)
+ await cur.execute(faker.create_stmt)
+ await cur.executemany(faker.insert_stmt, faker.records)
+ await cur.execute(faker.select_stmt)
+
+ if fetch == "one":
+ while 1:
+ tmp = await cur.fetchone()
+ if tmp is None:
+ break
+ elif fetch == "many":
+ while 1:
+ tmp = await cur.fetchmany(3)
+ if not tmp:
+ break
+ elif fetch == "all":
+ await cur.fetchall()
+ elif fetch == "iter":
+ async for rec in cur:
+ pass
+
+ tmp = None
+
+ del cur, conn
+ gc_collect()
+ n.append(len(gc.get_objects()))
+
+ assert (
+ n[0] == n[1] == n[2]
+ ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"