from time import sleep, time
from threading import Thread, Event
from collections import Counter
-from typing import Any
+from typing import Any, List, Tuple
import pytest
for retry in retries:
with retry:
- times = []
+ times: List[float] = []
t0 = time()
with pool.ConnectionPool(dsn, min_size=5, num_workers=2) as p:
for retry in retries:
with retry:
- results = []
+ results: List[Tuple[int, float, int]] = []
with pool.ConnectionPool(dsn, min_size=2) as p:
p.wait()
ts = [Thread(target=worker, args=(i,)) for i in range(6)]
else:
success.append(True)
- errors = []
- success = []
+ errors: List[Exception] = []
+ success: List[bool] = []
with pool.ConnectionPool(dsn, min_size=1, max_waiting=3) as p:
p.wait()
for retry in retries:
with retry:
- results = []
- errors = []
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
ts = [Thread(target=worker, args=(i,)) for i in range(4)]
if timeout > 0.2:
raise
- results = []
+ results: List[int] = []
with pool.ConnectionPool(dsn, min_size=2) as p:
ts = [
for retry in retries:
with retry:
- results = []
- errors = []
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
ts = [Thread(target=worker, args=(i,)) for i in range(4)]
for retry in retries:
with retry:
p = pool.ConnectionPool(dsn, min_size=1)
- success = []
+ success: List[str] = []
t1 = Thread(target=w1)
t2 = Thread(target=w2)
dsn, min_size=2, max_size=4, num_workers=3
) as p:
p.wait(1.0)
- results = []
+ results: List[Tuple[int, float]] = []
ts = [Thread(target=worker, args=(i,)) for i in range(6)]
for t in ts:
from psycopg_pool.pool import ShrinkPool
- results = []
+ results: List[Tuple[int, int]] = []
def run_hacked(self, pool):
n0 = pool._nconns
with p.connection() as conn:
conn.execute("select pg_sleep(%s)", [t])
- size = []
+ size: List[int] = []
with pool.ConnectionPool(dsn, min_size=2, max_idle=0.2) as p:
s = Thread(target=sampler)
import logging
from time import time
from collections import Counter
-from typing import Any
+from typing import Any, List, Tuple
import pytest
async for retry in retries:
with retry:
- times = []
+ times: List[float] = []
t0 = time()
async with pool.AsyncConnectionPool(
async for retry in retries:
with retry:
- results = []
+ results: List[Tuple[int, float, int]] = []
async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
await p.wait()
ts = [create_task(worker(i)) for i in range(6)]
else:
success.append(True)
- errors = []
- success = []
+ errors: List[Exception] = []
+ success: List[bool] = []
async with pool.AsyncConnectionPool(dsn, min_size=1, max_waiting=3) as p:
await p.wait()
async for retry in retries:
with retry:
- results = []
- errors = []
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
async with pool.AsyncConnectionPool(
dsn, min_size=2, timeout=0.1
raise
async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
- results = []
+ results: List[int] = []
ts = [
create_task(worker(i, timeout))
for i, timeout in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
async for retry in retries:
with retry:
- results = []
- errors = []
+ results: List[Tuple[int, float, int]] = []
+ errors: List[Tuple[int, float, Exception]] = []
async with pool.AsyncConnectionPool(
dsn, min_size=2, timeout=0.1
async for retry in retries:
with retry:
p = pool.AsyncConnectionPool(dsn, min_size=1)
- success = []
+ success: List[str] = []
t1 = create_task(w1())
await asyncio.sleep(0.1)
) as p:
await p.wait(1.0)
ts = []
- results = []
+ results: List[Tuple[int, float]] = []
ts = [create_task(worker(i)) for i in range(6)]
await asyncio.gather(*ts)
from psycopg_pool.pool_async import ShrinkPool
- results = []
+ results: List[Tuple[int, int]] = []
async def run_hacked(self, pool):
n0 = pool._nconns
async with p.connection() as conn:
await conn.execute("select pg_sleep(%s)", [t])
- size = []
+ size: List[int] = []
async with pool.AsyncConnectionPool(dsn, min_size=2, max_idle=0.2) as p:
s = create_task(sampler())