+# WARNING: this file is auto-generated by 'async_to_sync.py'
+# from the original file 'test_sched_async.py'
+# DO NOT CHANGE! Change the original file instead.
import logging
-from time import time, sleep
+from time import time
from functools import partial
-from threading import Thread
+from contextlib import contextmanager
import pytest
+from ..utils import spawn, gather, sleep
+
try:
from psycopg_pool.sched import Scheduler
except ImportError:
@pytest.mark.slow
-@pytest.mark.timing
def test_sched():
s = Scheduler()
results = []
@pytest.mark.slow
-@pytest.mark.timing
-def test_sched_thread():
+def test_sched_task():
s = Scheduler()
- t = Thread(target=s.run, daemon=True)
- t.start()
+ t = spawn(s.run)
results = []
s.enter(0.3, None)
s.enter(0.2, partial(worker, 2))
- t.join()
+ gather(t)
t1 = time()
assert t1 - t0 == pytest.approx(0.3, 0.2)
@pytest.mark.slow
-@pytest.mark.timing
def test_sched_error(caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
s = Scheduler()
- t = Thread(target=s.run, daemon=True)
- t.start()
+ t = spawn(s.run)
results = []
s.enter(0.3, partial(worker, 2))
s.enter(0.2, error)
- t.join()
+ gather(t)
t1 = time()
assert t1 - t0 == pytest.approx(0.4, 0.1)
def test_empty_queue_timeout():
s = Scheduler()
- t0 = time()
- times = []
+ with timed_wait(s) as times:
+ s.EMPTY_QUEUE_TIMEOUT = 0.2
- wait_orig = s._event.wait
-
- def wait_logging(timeout=None):
- rv = wait_orig(timeout)
- times.append(time() - t0)
- return rv
+ t = spawn(s.run)
+ sleep(0.5)
+ s.enter(0.5, None)
+ gather(t)
- setattr(s._event, "wait", wait_logging)
- s.EMPTY_QUEUE_TIMEOUT = 0.2
-
- t = Thread(target=s.run)
- t.start()
- sleep(0.5)
- s.enter(0.5, None)
- t.join()
- times.append(time() - t0)
for got, want in zip(times, [0.2, 0.4, 0.5, 1.0]):
assert got == pytest.approx(want, 0.2), times
def test_first_task_rescheduling():
s = Scheduler()
+ with timed_wait(s) as times:
+ s.EMPTY_QUEUE_TIMEOUT = 0.1
+
+ s.enter(0.4, noop)
+ t = spawn(s.run)
+ s.enter(0.6, None) # this task doesn't trigger a reschedule
+ sleep(0.1)
+ s.enter(0.1, noop) # this triggers a reschedule
+ gather(t)
+
+ for got, want in zip(times, [0.1, 0.2, 0.4, 0.6, 0.6]):
+ assert got == pytest.approx(want, 0.2), times
+
+
+@contextmanager
+def timed_wait(s):
+ """
+ Hack the scheduler's Event.wait() function in order to log waited time.
+
+ The context is a list where the times are accumulated.
+ """
t0 = time()
times = []
wait_orig = s._event.wait
def wait_logging(timeout=None):
- rv = wait_orig(timeout)
- times.append(time() - t0)
+ args = (timeout,)
+
+ try:
+ rv = wait_orig(*args)
+ finally:
+ times.append(time() - t0)
return rv
setattr(s._event, "wait", wait_logging)
- s.EMPTY_QUEUE_TIMEOUT = 0.1
-
- s.enter(0.4, lambda: None)
- t = Thread(target=s.run)
- t.start()
- s.enter(0.6, None) # this task doesn't trigger a reschedule
- sleep(0.1)
- s.enter(0.1, lambda: None) # this triggers a reschedule
- t.join()
+
+ yield times
+
times.append(time() - t0)
- for got, want in zip(times, [0.1, 0.2, 0.4, 0.6, 0.6]):
- assert got == pytest.approx(want, 0.2), times
+
+
+def noop():
+ pass
-import asyncio
import logging
from time import time
-from asyncio import create_task
from functools import partial
+from contextlib import asynccontextmanager
import pytest
+from ..utils import spawn, gather, asleep
+
try:
from psycopg_pool.sched_async import AsyncScheduler
except ImportError:
# Tests should have been skipped if the package is not available
pass
-pytestmark = [pytest.mark.anyio, pytest.mark.timing]
+pytestmark = [pytest.mark.timing]
+if True: # ASYNC:
+ pytestmark.append(pytest.mark.anyio)
@pytest.mark.slow
-@pytest.mark.timing
async def test_sched():
s = AsyncScheduler()
results = []
@pytest.mark.slow
-@pytest.mark.timing
async def test_sched_task():
s = AsyncScheduler()
- t = create_task(s.run())
+ t = spawn(s.run)
results = []
await s.enter(0.3, None)
await s.enter(0.2, partial(worker, 2))
- await asyncio.gather(t)
+ await gather(t)
t1 = time()
assert t1 - t0 == pytest.approx(0.3, 0.2)
@pytest.mark.slow
-@pytest.mark.timing
async def test_sched_error(caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
s = AsyncScheduler()
- t = create_task(s.run())
+ t = spawn(s.run)
results = []
await s.enter(0.3, partial(worker, 2))
await s.enter(0.2, error)
- await asyncio.gather(t)
+ await gather(t)
t1 = time()
assert t1 - t0 == pytest.approx(0.4, 0.1)
async def test_empty_queue_timeout():
s = AsyncScheduler()
- t0 = time()
- times = []
+ async with timed_wait(s) as times:
+ s.EMPTY_QUEUE_TIMEOUT = 0.2
- wait_orig = s._event.wait
+ t = spawn(s.run)
+ await asleep(0.5)
+ await s.enter(0.5, None)
+ await gather(t)
- async def wait_logging():
- try:
- rv = await wait_orig()
- finally:
- times.append(time() - t0)
- return rv
-
- setattr(s._event, "wait", wait_logging)
- s.EMPTY_QUEUE_TIMEOUT = 0.2
-
- t = create_task(s.run())
- await asyncio.sleep(0.5)
- await s.enter(0.5, None)
- await asyncio.gather(t)
- times.append(time() - t0)
for got, want in zip(times, [0.2, 0.4, 0.5, 1.0]):
assert got == pytest.approx(want, 0.2), times
async def test_first_task_rescheduling():
s = AsyncScheduler()
+ async with timed_wait(s) as times:
+ s.EMPTY_QUEUE_TIMEOUT = 0.1
+
+ await s.enter(0.4, noop)
+ t = spawn(s.run)
+ await s.enter(0.6, None) # this task doesn't trigger a reschedule
+ await asleep(0.1)
+ await s.enter(0.1, noop) # this triggers a reschedule
+ await gather(t)
+
+ for got, want in zip(times, [0.1, 0.2, 0.4, 0.6, 0.6]):
+ assert got == pytest.approx(want, 0.2), times
+
+
+@asynccontextmanager
+async def timed_wait(s):
+ """
+ Hack the scheduler's Event.wait() function in order to log waited time.
+
+ The context is a list where the times are accumulated.
+ """
t0 = time()
times = []
wait_orig = s._event.wait
- async def wait_logging():
+ async def wait_logging(timeout=None):
+ if True: # ASYNC
+ args = ()
+ else:
+ args = (timeout,)
+
try:
- rv = await wait_orig()
+ rv = await wait_orig(*args)
finally:
times.append(time() - t0)
return rv
setattr(s._event, "wait", wait_logging)
- s.EMPTY_QUEUE_TIMEOUT = 0.1
- async def noop():
- pass
+ yield times
- await s.enter(0.4, noop)
- t = create_task(s.run())
- await s.enter(0.6, None) # this task doesn't trigger a reschedule
- await asyncio.sleep(0.1)
- await s.enter(0.1, noop) # this triggers a reschedule
- await asyncio.gather(t)
times.append(time() - t0)
- for got, want in zip(times, [0.1, 0.2, 0.4, 0.6, 0.6]):
- assert got == pytest.approx(want, 0.2), times
+
+
+async def noop():
+ pass
import gc
import re
import sys
+import asyncio
+import inspect
import operator
+from time import sleep as sleep # noqa: F401 -- re-export
from typing import Callable, Optional, Tuple
+from threading import Thread
from contextlib import contextmanager, asynccontextmanager
-from contextlib import closing as closing # noqa: F401 - re-export
+from contextlib import closing as closing # noqa: F401 -- re-export
import pytest
with pytest.raises(*args, **kwargs) as ex:
yield ex
return
+
+
+def spawn(f):
+ """
+ Equivalent to asyncio.create_task or creating and running a Thread.
+ """
+ if inspect.iscoroutinefunction(f):
+ return asyncio.create_task(f())
+ else:
+ t = Thread(target=f, daemon=True)
+ t.start()
+ return t
+
+
+def gather(*ts):
+ """
+ Equivalent to asyncio.gather or Thread.join()
+ """
+ if ts and inspect.isawaitable(ts[0]):
+ return asyncio.gather(*ts)
+ else:
+ for t in ts:
+ t.join()
+
+
+def asleep(s):
+ """
+ Equivalent to asyncio.sleep(), converted to time.sleep() by async_to_sync.
+ """
+ return asyncio.sleep(s)
"aconn_cls": "conn_cls",
"alist": "list",
"anext": "next",
+ "asleep": "sleep",
"apipeline": "pipeline",
"asynccontextmanager": "contextmanager",
"connection_async": "connection",
"cursor_async": "cursor",
"ensure_table_async": "ensure_table",
"find_insert_problem_async": "find_insert_problem",
+ "psycopg_pool.sched_async": "psycopg_pool.sched",
"wait_async": "wait",
"wait_conn_async": "wait_conn",
}
psycopg/psycopg/connection_async.py \
psycopg/psycopg/cursor_async.py \
psycopg_pool/psycopg_pool/sched_async.py \
+ tests/pool/test_sched_async.py \
tests/test_client_cursor_async.py \
tests/test_connection_async.py \
tests/test_copy_async.py \