import sys
import asyncio
import logging
-import weakref
from time import time
from collections import Counter
await p2.putconn(conn)
-async def test_del_no_warning(dsn, recwarn):
- p = pool.AsyncConnectionPool(dsn, minconn=2)
- async with p.connection() as conn:
- await conn.execute("select 1")
-
- await p.wait_ready()
- ref = weakref.ref(p)
- del p
- await asyncio.sleep(0.1) # TODO: I wish it wasn't needed
- assert not ref()
- assert not recwarn
-
-
async def test_closed_getconn(dsn):
p = pool.AsyncConnectionPool(dsn, minconn=1)
assert not p.closed