@pytest.mark.slow
-def test_concurrent_filling(dsn, monkeypatch):
+def test_concurrent_filling(dsn, monkeypatch, retries):
delay_connection(monkeypatch, 0.1)
- t0 = time()
- times = []
-
- add_orig = pool.ConnectionPool._add_to_pool
def add_time(self, conn):
times.append(time() - t0)
add_orig(self, conn)
+ add_orig = pool.ConnectionPool._add_to_pool
monkeypatch.setattr(pool.ConnectionPool, "_add_to_pool", add_time)
- p = pool.ConnectionPool(dsn, minconn=5, num_workers=2)
- p.wait_ready(5.0)
- want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
- assert len(times) == len(want_times)
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.1), times
+ for retry in retries:
+ with retry:
+ times = []
+ t0 = time()
+
+ with pool.ConnectionPool(dsn, minconn=5, num_workers=2) as p:
+ p.wait_ready(1.0)
+ want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
+ assert len(times) == len(want_times)
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.1), times
@pytest.mark.slow
@pytest.mark.slow
-def test_queue(dsn):
- p = pool.ConnectionPool(dsn, minconn=2)
- results = []
-
+def test_queue(dsn, retries):
def worker(n):
t0 = time()
with p.connection() as conn:
t1 = time()
results.append((n, t1 - t0, pid))
- ts = []
- for i in range(6):
- t = Thread(target=worker, args=(i,))
- t.start()
- ts.append(t)
+ for retry in retries:
+ with retry:
+ results = []
+ ts = []
+ with pool.ConnectionPool(dsn, minconn=2) as p:
+ for i in range(6):
+ t = Thread(target=worker, args=(i,))
+ t.start()
+ ts.append(t)
- for t in ts:
- t.join()
+ for t in ts:
+ t.join()
- times = [item[1] for item in results]
- want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.2), times
+ times = [item[1] for item in results]
+ want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.2), times
- assert len(set(r[2] for r in results)) == 2
+ assert len(set(r[2] for r in results)) == 2
@pytest.mark.slow
p = pool.ConnectionPool(dsn, minconn=1)
conn = p.getconn()
- # Make the rollback fail
- orig_rollback = conn.rollback
-
def bad_rollback():
conn.pgconn.finish()
orig_rollback()
+ # Make the rollback fail
+ orig_rollback = conn.rollback
monkeypatch.setattr(conn, "rollback", bad_rollback)
pid = conn.pgconn.backend_pid
@pytest.mark.slow
-def test_grow(dsn, monkeypatch):
- p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, num_workers=3)
- p.wait_ready(5.0)
+def test_grow(dsn, monkeypatch, retries):
delay_connection(monkeypatch, 0.1)
- ts = []
- results = []
def worker(n):
t0 = time()
t1 = time()
results.append((n, t1 - t0))
- for i in range(6):
- t = Thread(target=worker, args=(i,))
- t.start()
- ts.append(t)
+ for retry in retries:
+ with retry:
+ with pool.ConnectionPool(
+ dsn, minconn=2, maxconn=4, num_workers=3
+ ) as p:
+ p.wait_ready(1.0)
+ ts = []
+ results = []
- for t in ts:
- t.join()
+ for i in range(6):
+ t = Thread(target=worker, args=(i,))
+ t.start()
+ ts.append(t)
+
+ for t in ts:
+ t.join()
- want_times = [0.2, 0.2, 0.3, 0.3, 0.4, 0.4]
- times = [item[1] for item in results]
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.2), times
+ want_times = [0.2, 0.2, 0.3, 0.3, 0.4, 0.4]
+ times = [item[1] for item in results]
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.1), times
@pytest.mark.slow
from psycopg3.pool.tasks import ShrinkPool
- orig_run = ShrinkPool._run
results = []
def run_hacked(self, pool):
n1 = pool._nconns
results.append((n0, n1))
+ orig_run = ShrinkPool._run
monkeypatch.setattr(ShrinkPool, "_run", run_hacked)
p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, max_idle=0.2)
"""
Return a _connect_gen function delayed by the amount of seconds
"""
- connect_orig = psycopg3.Connection.connect
def connect_delay(*args, **kwargs):
t0 = time()
sleep(sec - (t1 - t0))
return rv
+ connect_orig = psycopg3.Connection.connect
monkeypatch.setattr(psycopg3.Connection, "connect", connect_delay)
@pytest.mark.slow
-async def test_concurrent_filling(dsn, monkeypatch):
+async def test_concurrent_filling(dsn, monkeypatch, retries):
delay_connection(monkeypatch, 0.1)
- t0 = time()
- times = []
-
- add_orig = pool.AsyncConnectionPool._add_to_pool
async def add_time(self, conn):
times.append(time() - t0)
await add_orig(self, conn)
+ add_orig = pool.AsyncConnectionPool._add_to_pool
monkeypatch.setattr(pool.AsyncConnectionPool, "_add_to_pool", add_time)
- p = pool.AsyncConnectionPool(dsn, minconn=5, num_workers=2)
- await p.wait_ready(5.0)
- want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
- assert len(times) == len(want_times)
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.2), times
- await p.close()
+ async for retry in retries:
+ with retry:
+ times = []
+ t0 = time()
+
+ async with pool.AsyncConnectionPool(
+ dsn, minconn=5, num_workers=2
+ ) as p:
+ await p.wait_ready(1.0)
+ want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
+ assert len(times) == len(want_times)
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.1), times
@pytest.mark.slow
@pytest.mark.slow
-async def test_queue(dsn):
- p = pool.AsyncConnectionPool(dsn, minconn=2)
- results = []
-
+async def test_queue(dsn, retries):
async def worker(n):
t0 = time()
async with p.connection() as conn:
t1 = time()
results.append((n, t1 - t0, pid))
- ts = [create_task(worker(i)) for i in range(6)]
- await asyncio.gather(*ts)
- await p.close()
+ async for retry in retries:
+ with retry:
+ results = []
+ async with pool.AsyncConnectionPool(dsn, minconn=2) as p:
+ ts = [create_task(worker(i)) for i in range(6)]
+ await asyncio.gather(*ts)
- times = [item[1] for item in results]
- want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.2), times
+ times = [item[1] for item in results]
+ want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.1), times
- assert len(set(r[2] for r in results)) == 2, results
+ assert len(set(r[2] for r in results)) == 2, results
@pytest.mark.slow
p = pool.AsyncConnectionPool(dsn, minconn=1)
conn = await p.getconn()
- # Make the rollback fail
- orig_rollback = conn.rollback
-
async def bad_rollback():
conn.pgconn.finish()
await orig_rollback()
+ # Make the rollback fail
+ orig_rollback = conn.rollback
monkeypatch.setattr(conn, "rollback", bad_rollback)
pid = conn.pgconn.backend_pid
@pytest.mark.slow
-async def test_grow(dsn, monkeypatch):
- p = pool.AsyncConnectionPool(dsn, minconn=2, maxconn=4, num_workers=3)
- await p.wait_ready(5.0)
+async def test_grow(dsn, monkeypatch, retries):
delay_connection(monkeypatch, 0.1)
- ts = []
- results = []
async def worker(n):
t0 = time()
t1 = time()
results.append((n, t1 - t0))
- ts = [create_task(worker(i)) for i in range(6)]
- await asyncio.gather(*ts)
- await p.close()
+ async for retry in retries:
+ with retry:
+ async with pool.AsyncConnectionPool(
+ dsn, minconn=2, maxconn=4, num_workers=3
+ ) as p:
+ await p.wait_ready(1.0)
+ ts = []
+ results = []
+
+ ts = [create_task(worker(i)) for i in range(6)]
+ await asyncio.gather(*ts)
+ await p.close()
- want_times = [0.2, 0.2, 0.3, 0.3, 0.4, 0.4]
- times = [item[1] for item in results]
- for got, want in zip(times, want_times):
- assert got == pytest.approx(want, 0.2), times
+ want_times = [0.2, 0.2, 0.3, 0.3, 0.4, 0.4]
+ times = [item[1] for item in results]
+ for got, want in zip(times, want_times):
+ assert got == pytest.approx(want, 0.1), times
@pytest.mark.slow
from psycopg3.pool.tasks import ShrinkPool
- orig_run = ShrinkPool._run_async
results = []
async def run_async_hacked(self, pool):
n1 = pool._nconns
results.append((n0, n1))
+ orig_run = ShrinkPool._run_async
monkeypatch.setattr(ShrinkPool, "_run_async", run_async_hacked)
p = pool.AsyncConnectionPool(dsn, minconn=2, maxconn=4, max_idle=0.2)
"""
Return a _connect_gen function delayed by the amount of seconds
"""
- connect_orig = psycopg3.AsyncConnection.connect
async def connect_delay(*args, **kwargs):
t0 = time()
await asyncio.sleep(sec - (t1 - t0))
return rv
+ connect_orig = psycopg3.AsyncConnection.connect
monkeypatch.setattr(psycopg3.AsyncConnection, "connect", connect_delay)