@pytest.mark.slow
+@pytest.mark.timing
def test_sched():
s = Scheduler()
results = []
@pytest.mark.slow
+@pytest.mark.timing
def test_sched_thread():
s = Scheduler()
t = Thread(target=s.run, daemon=True)
@pytest.mark.slow
+@pytest.mark.timing
def test_sched_error(caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
s = Scheduler()
@pytest.mark.slow
+@pytest.mark.timing
async def test_sched():
s = AsyncScheduler()
results = []
@pytest.mark.slow
+@pytest.mark.timing
async def test_sched_task():
s = AsyncScheduler()
t = create_task(s.run())
@pytest.mark.slow
+@pytest.mark.timing
async def test_sched_error(caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
s = AsyncScheduler()