+# WARNING: this file is auto-generated by 'async_to_sync.py'
+# from the original file 'test_pool_async.py'
+# DO NOT CHANGE! Change the original file instead.
import logging
import weakref
-from time import sleep, time
-from threading import Thread, Event
+from time import time
from typing import Any, Dict, List, Tuple
import pytest
import psycopg
from psycopg.pq import TransactionStatus
from psycopg.rows import class_row, Row, TupleRow
-from psycopg._compat import Counter, assert_type
+from psycopg._compat import assert_type, Counter
+
+from ..utils import Event, spawn, gather, sleep, is_alive, is_async
try:
import psycopg_pool as pool
pass
-def test_package_version(mypy):
- cp = mypy.run_on_source(
- """\
-from psycopg_pool import __version__
-assert __version__
-"""
- )
- assert not cp.stdout
-
-
def test_defaults(dsn):
with pool.ConnectionPool(dsn) as p:
assert p.min_size == p.max_size == 4
def test_generic_connection_type(dsn):
def set_autocommit(conn: psycopg.Connection[Any]) -> None:
- conn.autocommit = True
+ conn.set_autocommit(True)
class MyConnection(psycopg.Connection[Row]):
pass
connection_class=MyConnection[MyRow],
kwargs=dict(row_factory=class_row(MyRow)),
configure=set_autocommit,
- ) as p1, p1.connection() as conn1:
- (row1,) = conn1.execute("select 1 as x").fetchall()
+ ) as p1:
+ with p1.connection() as conn1:
+ cur1 = conn1.execute("select 1 as x")
+ (row1,) = cur1.fetchall()
assert_type(p1, pool.ConnectionPool[MyConnection[MyRow]])
assert_type(conn1, MyConnection[MyRow])
assert_type(row1, MyRow)
with pool.ConnectionPool(dsn, connection_class=MyConnection[TupleRow]) as p2:
with p2.connection() as conn2:
- (row2,) = conn2.execute("select 2 as y").fetchall()
+ cur2 = conn2.execute("select 2 as y")
+ (row2,) = cur2.fetchall()
assert_type(p2, pool.ConnectionPool[MyConnection[TupleRow]])
assert_type(conn2, MyConnection[TupleRow])
assert_type(row2, TupleRow)
def test_non_generic_connection_type(dsn):
def set_autocommit(conn: psycopg.Connection[Any]) -> None:
- conn.autocommit = True
+ conn.set_autocommit(True)
class MyConnection(psycopg.Connection[MyRow]):
def __init__(self, *args: Any, **kwargs: Any):
dsn, connection_class=MyConnection, configure=set_autocommit
) as p1:
with p1.connection() as conn1:
- (row1,) = conn1.execute("select 1 as x").fetchall()
+ cur1 = conn1.execute("select 1 as x")
+ (row1,) = cur1.fetchall()
assert_type(p1, pool.ConnectionPool[MyConnection])
assert_type(conn1, MyConnection)
assert_type(row1, MyRow)
conn.execute("set default_transaction_read_only to on")
with pool.ConnectionPool(dsn, min_size=1, configure=configure) as p:
- p.wait()
+ p.wait(timeout=1.0)
with p.connection() as conn:
assert inits == 1
res = conn.execute("show default_transaction_read_only")
assert resets == 1
with p.connection() as conn:
- with conn.execute("show timezone") as cur:
- assert cur.fetchone() == ("UTC",)
+ cur = conn.execute("show timezone")
+ assert cur.fetchone() == ("UTC",)
p.wait()
assert resets == 2
results: List[Tuple[int, float, int]] = []
with pool.ConnectionPool(dsn, min_size=2) as p:
p.wait()
- ts = [Thread(target=worker, args=(i,)) for i in range(6)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ ts = [spawn(worker, args=(i,)) for i in range(6)]
+ gather(*ts)
times = [item[1] for item in results]
want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
for got, want in zip(times, want_times):
assert got == pytest.approx(want, 0.1), times
- assert len(set(r[2] for r in results)) == 2, results
+ assert len(set((r[2] for r in results))) == 2, results
@pytest.mark.slow
with pool.ConnectionPool(dsn, min_size=1, max_waiting=3) as p:
p.wait()
ev = Event()
- t = Thread(target=worker, args=(0.3, ev))
- t.start()
+ spawn(worker, args=(0.3, ev))
ev.wait()
- ts = [Thread(target=worker, args=(0.1,)) for i in range(4)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ ts = [spawn(worker, args=(0.1,)) for i in range(4)]
+ gather(*ts)
assert len(success) == 4
assert len(errors) == 1
errors: List[Tuple[int, float, Exception]] = []
with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
- ts = [Thread(target=worker, args=(i,)) for i in range(4)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ ts = [spawn(worker, args=(i,)) for i in range(4)]
+ gather(*ts)
assert len(results) == 2
assert len(errors) == 2
if timeout > 0.2:
raise
- results: List[int] = []
-
with pool.ConnectionPool(dsn, min_size=2) as p:
+ results: List[int] = []
ts = [
- Thread(target=worker, args=(i, timeout))
- for i, timeout in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
+ spawn(worker, args=(i, timeout))
+ for (i, timeout) in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ gather(*ts)
+
sleep(0.2)
assert set(results) == set([0, 1, 3, 4])
assert len(p._pool) == 2 # no connection was lost
errors: List[Tuple[int, float, Exception]] = []
with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
- ts = [Thread(target=worker, args=(i,)) for i in range(4)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ ts = [spawn(worker, args=(i,)) for i in range(4)]
+ gather(*ts)
assert len(results) == 3
assert len(errors) == 1
with p.connection() as conn2:
assert conn2.info.backend_pid == pid
assert conn2.info.transaction_status == TransactionStatus.IDLE
- assert not conn2.execute(
+ cur = conn2.execute(
"select 1 from pg_class where relname = 'test_intrans_rollback'"
- ).fetchone()
+ )
+ assert not cur.fetchone()
assert len(caplog.records) == 1
assert "INTRANS" in caplog.records[0].message
assert "BAD" in caplog.records[2].message
-def test_close_no_threads(dsn):
+def test_close_no_tasks(dsn):
p = pool.ConnectionPool(dsn)
- assert p._sched_runner and p._sched_runner.is_alive()
+ assert p._sched_runner and is_alive(p._sched_runner)
workers = p._workers[:]
assert workers
for t in workers:
- assert t.is_alive()
+ assert is_alive(t)
p.close()
assert p._sched_runner is None
assert not p._workers
for t in workers:
- assert not t.is_alive()
+ assert not is_alive(t)
def test_putconn_no_pool(conn_cls, dsn):
@pytest.mark.slow
-def test_del_stop_threads(dsn):
+@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+def test_del_stops_threads(dsn):
p = pool.ConnectionPool(dsn)
assert p._sched_runner is not None
ts = [p._sched_runner] + p._workers
del p
sleep(0.1)
for t in ts:
- assert not t.is_alive()
+ assert not is_alive(t), t
def test_closed_getconn(dsn):
p.wait()
success: List[str] = []
- t1 = Thread(target=w1)
- t1.start()
+ t1 = spawn(w1)
# Wait until w1 has received a connection
e1.wait()
- t2 = Thread(target=w2)
- t2.start()
+ t2 = spawn(w2)
# Wait until w2 is in the queue
ensure_waiting(p)
-
- p.close(0)
+ p.close()
# Wait for the workers to finish
e2.set()
- t1.join()
- t2.join()
+ gather(t1, t2)
assert len(success) == 2
with pytest.raises(pool.PoolClosed, match="is not open yet"):
p.getconn()
- with pytest.raises(pool.PoolClosed):
+ with pytest.raises(pool.PoolClosed, match="is not open yet"):
with p.connection():
pass
with p.connection() as conn:
cur = conn.execute("select 1")
assert cur.fetchone() == (1,)
-
finally:
p.close()
with p.connection() as conn:
cur = conn.execute("select 1")
assert cur.fetchone() == (1,)
-
finally:
p.close()
@pytest.mark.parametrize(
"min_size, want_times",
[
- (2, [0.25, 0.25, 0.35, 0.45, 0.50, 0.50, 0.60, 0.70]),
- (0, [0.35, 0.45, 0.55, 0.60, 0.65, 0.70, 0.80, 0.85]),
+ (2, [0.25, 0.25, 0.35, 0.45, 0.5, 0.5, 0.6, 0.7]),
+ (0, [0.35, 0.45, 0.55, 0.6, 0.65, 0.7, 0.8, 0.85]),
],
)
def test_grow(dsn, monkeypatch, min_size, want_times):
with pool.ConnectionPool(dsn, min_size=min_size, max_size=4, num_workers=3) as p:
p.wait(1.0)
results: List[Tuple[int, float]] = []
-
- ts = [Thread(target=worker, args=(i,)) for i in range(len(want_times))]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ ts = [spawn(worker, args=(i,)) for i in range(len(want_times))]
+ gather(*ts)
times = [item[1] for item in results]
for got, want in zip(times, want_times):
p.wait(5.0)
assert p.max_idle == 0.2
- ts = [Thread(target=worker, args=(i,)) for i in range(4)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ ts = [spawn(worker, args=(i,)) for i in range(4)]
+ gather(*ts)
+
sleep(1)
assert results == [(4, 4), (4, 3), (3, 2), (2, 2), (2, 2)]
@pytest.mark.slow
@pytest.mark.timing
-def test_reconnect_failure(proxy):
+@pytest.mark.parametrize("async_cb", [True, False])
+def test_reconnect_failure(proxy, async_cb):
+ if async_cb and (not is_async(__name__)):
+ pytest.skip("async test only")
+
proxy.start()
t1 = None
- def failed(pool):
- assert pool.name == "this-one"
- nonlocal t1
- t1 = time()
+ if async_cb:
+
+ def failed(pool):
+ assert pool.name == "this-one"
+ nonlocal t1
+ t1 = time()
+
+ else:
+
+ def failed(pool):
+ assert pool.name == "this-one"
+ nonlocal t1
+ t1 = time()
with pool.ConnectionPool(
proxy.client_dsn,
with pool.ConnectionPool(
proxy.client_dsn, min_size=4, reconnect_timeout=1.0, reconnect_failed=failed
) as p:
- assert ev.wait(timeout=2)
+ ev.wait(2.0)
with pytest.raises(pool.PoolTimeout):
with p.connection(timeout=0.5) as conn:
pass
ev.clear()
- assert ev.wait(timeout=2)
+ ev.wait(2.0)
proxy.start()
# Checking the pool will empty it
p.check()
- assert ev.wait(timeout=2)
+ ev.wait(2.0)
assert len(p._pool) == 0
# Allow to connect again
size: List[int] = []
with pool.ConnectionPool(dsn, min_size=2, max_idle=0.2) as p:
- s = Thread(target=sampler)
- s.start()
+ s = spawn(sampler)
sleep(0.3)
- c = Thread(target=client, args=(0.4,))
- c.start()
+
+ c = spawn(client, args=(0.4,))
sleep(0.2)
p.resize(4)
sleep(0.6)
- s.join()
+ gather(s, c)
assert size == [2, 1, 3, 4, 3, 2, 2]
pid = conn.info.backend_pid
p.wait(1.0)
- pids = set(conn.info.backend_pid for conn in p._pool)
+ pids = set((conn.info.backend_pid for conn in p._pool))
assert pid in pids
conn.close()
p.check()
assert len(caplog.records) == 1
p.wait(1.0)
- pids2 = set(conn.info.backend_pid for conn in p._pool)
+ pids2 = set((conn.info.backend_pid for conn in p._pool))
assert len(pids & pids2) == 3
assert pid not in pids2
assert stats["pool_available"] == 2
assert stats["requests_waiting"] == 0
- ts = [Thread(target=worker, args=(i,)) for i in range(3)]
- for t in ts:
- t.start()
+ ts = [spawn(worker, args=(i,)) for i in range(3)]
sleep(0.1)
stats = p.get_stats()
- for t in ts:
- t.join()
+ gather(*ts)
assert stats["pool_min"] == 2
assert stats["pool_max"] == 4
assert stats["pool_size"] == 3
assert stats["requests_waiting"] == 0
p.wait(2.0)
- ts = [Thread(target=worker, args=(i,)) for i in range(7)]
- for t in ts:
- t.start()
+ ts = [spawn(worker, args=(i,)) for i in range(7)]
sleep(0.1)
stats = p.get_stats()
- for t in ts:
- t.join()
+ gather(*ts)
assert stats["pool_min"] == 2
assert stats["pool_max"] == 4
assert stats["pool_size"] == 4
with pool.ConnectionPool(dsn, min_size=3) as p:
p.wait(2.0)
- ts = [Thread(target=worker, args=(i,)) for i in range(7)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ ts = [spawn(worker, args=(i,)) for i in range(7)]
+ gather(*ts)
stats = p.get_stats()
assert stats["requests_num"] == 7
assert stats["requests_queued"] == 4
assert stats["connections_num"] == 3
assert stats.get("connections_errors", 0) == 0
assert stats.get("connections_lost", 0) == 0
- assert 600 <= stats["connections_ms"] < 1200
+ assert 580 <= stats["connections_ms"] < 1200
proxy.stop()
p.check()
with pool.ConnectionPool(dsn, min_size=5, max_size=10) as p:
p.wait()
- ts = [Thread(target=worker) for i in range(50)]
- for t in ts:
- t.start()
- for t in ts:
- t.join()
+ ts = [spawn(worker) for i in range(50)]
+ gather(*ts)
p.wait()
assert len(p._pool) < 7
logger.addHandler(handler)
try:
with pool.ConnectionPool(dsn, min_size=4, open=True) as p:
- try:
- p.wait(timeout=2)
- finally:
- print(p.get_stats())
+ p.wait(timeout=2)
finally:
logger.removeHandler(handler)
logger.setLevel(old_level)
+@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+def test_cancellation_in_queue(dsn):
+ # https://github.com/psycopg/psycopg/issues/509
+
+ nconns = 3
+
+ with pool.ConnectionPool(dsn, min_size=nconns, timeout=1) as p:
+ p.wait()
+
+ got_conns = []
+ ev = Event()
+
+ def worker(i):
+ try:
+ logging.info("worker %s started", i)
+ nonlocal got_conns
+
+ with p.connection() as conn:
+ logging.info("worker %s got conn", i)
+ cur = conn.execute("select 1")
+ assert cur.fetchone() == (1,)
+
+ got_conns.append(conn)
+ if len(got_conns) >= nconns:
+ ev.set()
+
+ sleep(5)
+ except BaseException as ex:
+ logging.info("worker %s stopped: %r", i, ex)
+ raise
+
+ # Start tasks taking up all the connections and getting in the queue
+ tasks = [spawn(worker, (i,)) for i in range(nconns * 3)]
+
+ # wait until the pool has served all the connections and clients are queued.
+ ev.wait(3.0)
+ for i in range(10):
+ if p.get_stats().get("requests_queued", 0):
+ break
+ else:
+ sleep(0.1)
+ else:
+ pytest.fail("no client got in the queue")
+
+ [task.cancel() for task in reversed(tasks)]
+ gather(*tasks, return_exceptions=True, timeout=1.0)
+
+ stats = p.get_stats()
+ assert stats["pool_available"] == 3
+ assert stats.get("requests_waiting", 0) == 0
+
+ with p.connection() as conn:
+ cur = conn.execute("select 1")
+ assert cur.fetchone() == (1,)
+
+
def delay_connection(monkeypatch, sec):
"""
Return a _connect_gen function delayed by the amount of seconds
-import asyncio
import logging
+import weakref
from time import time
from typing import Any, Dict, List, Tuple
-from asyncio import create_task
import pytest
from psycopg.rows import class_row, Row, TupleRow
from psycopg._compat import assert_type, Counter
+from ..utils import AEvent, spawn, gather, asleep, is_alive, is_async
+
try:
import psycopg_pool as pool
except ImportError:
# Tests should have been skipped if the package is not available
pass
-pytestmark = [pytest.mark.anyio]
+if True: # ASYNC
+ pytestmark = [pytest.mark.anyio]
async def test_defaults(dsn):
async with pool.AsyncConnectionPool(
proxy.client_dsn, min_size=1, num_workers=1
) as p:
- await asyncio.sleep(0.5)
+ await asleep(0.5)
assert not p._pool
proxy.start()
results: List[Tuple[int, float, int]] = []
async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
await p.wait()
- ts = [create_task(worker(i)) for i in range(6)]
- await asyncio.gather(*ts)
+ ts = [spawn(worker, args=(i,)) for i in range(6)]
+ await gather(*ts)
times = [item[1] for item in results]
want_times = [0.2, 0.2, 0.4, 0.4, 0.6, 0.6]
async with p.connection():
if ev:
ev.set()
- await asyncio.sleep(t)
+ await asleep(t)
except pool.TooManyRequests as e:
errors.append(e)
else:
async with pool.AsyncConnectionPool(dsn, min_size=1, max_waiting=3) as p:
await p.wait()
- ev = asyncio.Event()
- create_task(worker(0.3, ev))
+ ev = AEvent()
+ spawn(worker, args=(0.3, ev))
await ev.wait()
- ts = [create_task(worker(0.1)) for i in range(4)]
- await asyncio.gather(*ts)
+ ts = [spawn(worker, args=(0.1,)) for i in range(4)]
+ await gather(*ts)
assert len(success) == 4
assert len(errors) == 1
errors: List[Tuple[int, float, Exception]] = []
async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p:
- ts = [create_task(worker(i)) for i in range(4)]
- await asyncio.gather(*ts)
+ ts = [spawn(worker, args=(i,)) for i in range(4)]
+ await gather(*ts)
assert len(results) == 2
assert len(errors) == 2
async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
results: List[int] = []
ts = [
- create_task(worker(i, timeout))
+ spawn(worker, args=(i, timeout))
for i, timeout in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
]
- await asyncio.gather(*ts)
+ await gather(*ts)
- await asyncio.sleep(0.2)
+ await asleep(0.2)
assert set(results) == set([0, 1, 3, 4])
assert len(p._pool) == 2 # no connection was lost
errors: List[Tuple[int, float, Exception]] = []
async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p:
- ts = [create_task(worker(i)) for i in range(4)]
- await asyncio.gather(*ts)
+ ts = [spawn(worker, args=(i,)) for i in range(4)]
+ await gather(*ts)
assert len(results) == 3
assert len(errors) == 1
async def test_close_no_tasks(dsn):
p = pool.AsyncConnectionPool(dsn)
- assert p._sched_runner and not p._sched_runner.done()
- assert p._workers
+ assert p._sched_runner and is_alive(p._sched_runner)
workers = p._workers[:]
+ assert workers
for t in workers:
- assert not t.done()
+ assert is_alive(t)
await p.close()
assert p._sched_runner is None
assert not p._workers
for t in workers:
- assert t.done()
+ assert not is_alive(t)
async def test_putconn_no_pool(aconn_cls, dsn):
await p2.putconn(conn)
+async def test_del_no_warning(dsn, recwarn):
+ p = pool.AsyncConnectionPool(dsn, min_size=2)
+ async with p.connection() as conn:
+ await conn.execute("select 1")
+
+ await p.wait()
+ ref = weakref.ref(p)
+ del p
+ assert not ref()
+ assert not recwarn, [str(w.message) for w in recwarn.list]
+
+
+@pytest.mark.slow
+@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+async def test_del_stops_threads(dsn):
+ p = pool.AsyncConnectionPool(dsn)
+ assert p._sched_runner is not None
+ ts = [p._sched_runner] + p._workers
+ del p
+ await asleep(0.1)
+ for t in ts:
+ assert not is_alive(t), t
+
+
async def test_closed_getconn(dsn):
p = pool.AsyncConnectionPool(dsn, min_size=1)
assert not p.closed
except pool.PoolClosed:
success.append("w2")
- e1 = asyncio.Event()
- e2 = asyncio.Event()
+ e1 = AEvent()
+ e2 = AEvent()
p = pool.AsyncConnectionPool(dsn, min_size=1)
await p.wait()
success: List[str] = []
- t1 = create_task(w1())
+ t1 = spawn(w1)
# Wait until w1 has received a connection
await e1.wait()
- t2 = create_task(w2())
+ t2 = spawn(w2)
# Wait until w2 is in the queue
await ensure_waiting(p)
await p.close()
# Wait for the workers to finish
e2.set()
- await asyncio.gather(t1, t2)
+ await gather(t1, t2)
assert len(success) == 2
async def test_open_explicit(dsn):
p = pool.AsyncConnectionPool(dsn, open=False)
assert p.closed
- with pytest.raises(pool.PoolClosed):
+ with pytest.raises(pool.PoolClosed, match="is not open yet"):
await p.getconn()
with pytest.raises(pool.PoolClosed, match="is not open yet"):
await conn.execute("select 1")
await p.close()
assert p._sched_runner is None
+ assert not p._workers
with pytest.raises(psycopg.OperationalError, match="cannot be reused"):
await p.open()
dsn, min_size=min_size, max_size=4, num_workers=3
) as p:
await p.wait(1.0)
- ts = []
results: List[Tuple[int, float]] = []
-
- ts = [create_task(worker(i)) for i in range(len(want_times))]
- await asyncio.gather(*ts)
+ ts = [spawn(worker, args=(i,)) for i in range(len(want_times))]
+ await gather(*ts)
times = [item[1] for item in results]
for got, want in zip(times, want_times):
await p.wait(5.0)
assert p.max_idle == 0.2
- ts = [create_task(worker(i)) for i in range(4)]
- await asyncio.gather(*ts)
+ ts = [spawn(worker, args=(i,)) for i in range(4)]
+ await gather(*ts)
- await asyncio.sleep(1)
+ await asleep(1)
assert results == [(4, 4), (4, 3), (3, 2), (2, 2), (2, 2)]
async with p.connection() as conn:
await conn.execute("select 1")
- await asyncio.sleep(1.0)
+ await asleep(1.0)
proxy.start()
await p.wait()
@pytest.mark.timing
@pytest.mark.parametrize("async_cb", [True, False])
async def test_reconnect_failure(proxy, async_cb):
+ if async_cb and not is_async(__name__):
+ pytest.skip("async test only")
+
proxy.start()
t1 = None
await conn.execute("select 1")
t0 = time()
- await asyncio.sleep(1.5)
+ await asleep(1.5)
assert t1
assert t1 - t0 == pytest.approx(1.0, 0.1)
assert p._nconns == 0
# in grow mode. See issue #370.
proxy.stop()
- ev = asyncio.Event()
+ ev = AEvent()
def failed(pool):
ev.set()
async with pool.AsyncConnectionPool(
proxy.client_dsn, min_size=4, reconnect_timeout=1.0, reconnect_failed=failed
) as p:
- await asyncio.wait_for(ev.wait(), 2.0)
+ await ev.wait_timeout(2.0)
with pytest.raises(pool.PoolTimeout):
async with p.connection(timeout=0.5) as conn:
pass
ev.clear()
- await asyncio.wait_for(ev.wait(), 2.0)
+ await ev.wait_timeout(2.0)
proxy.start()
@pytest.mark.slow
async def test_refill_on_check(proxy):
proxy.start()
- ev = asyncio.Event()
+ ev = AEvent()
def failed(pool):
ev.set()
# Checking the pool will empty it
await p.check()
- await asyncio.wait_for(ev.wait(), 2.0)
+ await ev.wait_timeout(2.0)
assert len(p._pool) == 0
# Allow to connect again
counts = Counter[int]()
for i in range(8):
async with p.connection() as conn:
- await asyncio.sleep(0.1)
+ await asleep(0.1)
counts[id(conn)] += 1
assert len(counts) == 4
@pytest.mark.timing
async def test_resize(dsn):
async def sampler():
- await asyncio.sleep(0.05) # ensure sampling happens after shrink check
+ await asleep(0.05) # ensure sampling happens after shrink check
while True:
- await asyncio.sleep(0.2)
+ await asleep(0.2)
if p.closed:
break
size.append(len(p._pool))
size: List[int] = []
async with pool.AsyncConnectionPool(dsn, min_size=2, max_idle=0.2) as p:
- s = create_task(sampler())
+ s = spawn(sampler)
- await asyncio.sleep(0.3)
+ await asleep(0.3)
- c = create_task(client(0.4))
+ c = spawn(client, args=(0.4,))
- await asyncio.sleep(0.2)
+ await asleep(0.2)
await p.resize(4)
assert p.min_size == 4
assert p.max_size == 4
- await asyncio.sleep(0.4)
+ await asleep(0.4)
await p.resize(2)
assert p.min_size == 2
assert p.max_size == 2
- await asyncio.sleep(0.6)
+ await asleep(0.6)
- await asyncio.gather(s, c)
+ await gather(s, c)
assert size == [2, 1, 3, 4, 3, 2, 2]
@pytest.mark.crdb_skip("backend pid")
async def test_max_lifetime(dsn):
async with pool.AsyncConnectionPool(dsn, min_size=1, max_lifetime=0.2) as p:
- await asyncio.sleep(0.1)
+ await asleep(0.1)
pids = []
for i in range(5):
async with p.connection() as conn:
pids.append(conn.info.backend_pid)
- await asyncio.sleep(0.2)
+ await asleep(0.2)
assert pids[0] == pids[1] != pids[4], pids
pid = conn.info.backend_pid
async with p.connection() as conn:
assert conn.info.backend_pid == pid
- await asyncio.sleep(0.3)
+ await asleep(0.3)
await p.check()
async with p.connection() as conn:
assert conn.info.backend_pid != pid
assert stats["pool_available"] == 2
assert stats["requests_waiting"] == 0
- ts = [create_task(worker(i)) for i in range(3)]
- await asyncio.sleep(0.1)
+ ts = [spawn(worker, args=(i,)) for i in range(3)]
+ await asleep(0.1)
stats = p.get_stats()
- await asyncio.gather(*ts)
+ await gather(*ts)
assert stats["pool_min"] == 2
assert stats["pool_max"] == 4
assert stats["pool_size"] == 3
assert stats["requests_waiting"] == 0
await p.wait(2.0)
- ts = [create_task(worker(i)) for i in range(7)]
- await asyncio.sleep(0.1)
+ ts = [spawn(worker, args=(i,)) for i in range(7)]
+ await asleep(0.1)
stats = p.get_stats()
- await asyncio.gather(*ts)
+ await gather(*ts)
assert stats["pool_min"] == 2
assert stats["pool_max"] == 4
assert stats["pool_size"] == 4
async with pool.AsyncConnectionPool(dsn, min_size=3) as p:
await p.wait(2.0)
- ts = [create_task(worker(i)) for i in range(7)]
- await asyncio.gather(*ts)
+ ts = [spawn(worker, args=(i,)) for i in range(7)]
+ await gather(*ts)
stats = p.get_stats()
assert stats["requests_num"] == 7
assert stats["requests_queued"] == 4
proxy.stop()
await p.check()
- await asyncio.sleep(0.1)
+ await asleep(0.1)
stats = p.get_stats()
assert stats["connections_num"] > 3
assert stats["connections_errors"] > 0
async def worker():
async with p.connection():
- await asyncio.sleep(0.002)
+ await asleep(0.002)
async with pool.AsyncConnectionPool(dsn, min_size=5, max_size=10) as p:
await p.wait()
- ts = [create_task(worker()) for i in range(50)]
- await asyncio.gather(*ts)
+ ts = [spawn(worker) for i in range(50)]
+ await gather(*ts)
await p.wait()
assert len(p._pool) < 7
logger.setLevel(old_level)
+@pytest.mark.skipif(not is_async(__name__), reason="async test only")
async def test_cancellation_in_queue(dsn):
# https://github.com/psycopg/psycopg/issues/509
await p.wait()
got_conns = []
- ev = asyncio.Event()
+ ev = AEvent()
async def worker(i):
try:
if len(got_conns) >= nconns:
ev.set()
- await asyncio.sleep(5)
+ await asleep(5)
except BaseException as ex:
logging.info("worker %s stopped: %r", i, ex)
raise
# Start tasks taking up all the connections and getting in the queue
- tasks = [asyncio.ensure_future(worker(i)) for i in range(nconns * 3)]
+ tasks = [spawn(worker, (i,)) for i in range(nconns * 3)]
# wait until the pool has served all the connections and clients are queued.
- await asyncio.wait_for(ev.wait(), 3.0)
+ await ev.wait_timeout(3.0)
for i in range(10):
if p.get_stats().get("requests_queued", 0):
break
else:
- await asyncio.sleep(0.1)
+ await asleep(0.1)
else:
pytest.fail("no client got in the queue")
[task.cancel() for task in reversed(tasks)]
- await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), 1.0)
+ await gather(*tasks, return_exceptions=True, timeout=1.0)
stats = p.get_stats()
assert stats["pool_available"] == 3
t0 = time()
rv = await connect_orig(*args, **kwargs)
t1 = time()
- await asyncio.sleep(max(0, sec - (t1 - t0)))
+ await asleep(max(0, sec - (t1 - t0)))
return rv
connect_orig = psycopg.AsyncConnection.connect
async def ensure_waiting(p, num=1):
+ """
+ Wait until there are at least *num* clients waiting in the queue.
+ """
while len(p._waiting) < num:
- await asyncio.sleep(0)
+ await asleep(0)