From: Daniele Varrazzo Date: Mon, 11 Sep 2023 16:10:17 +0000 (+0100) Subject: refactor(pool): generate Scheduler from AsyncScheduler X-Git-Tag: pool-3.2.0~12^2~37 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c3d303d8b3b642ad45b053096e3f8e4111012600;p=thirdparty%2Fpsycopg.git refactor(pool): generate Scheduler from AsyncScheduler --- diff --git a/psycopg_pool/psycopg_pool/_task.py b/psycopg_pool/psycopg_pool/_task.py new file mode 100644 index 000000000..419765014 --- /dev/null +++ b/psycopg_pool/psycopg_pool/_task.py @@ -0,0 +1,27 @@ +""" +Task for Scheduler and AsyncScheduler +""" + +# Copyright (C) 2023 The Psycopg Team + +from typing import Any, Callable, Optional, NamedTuple + + +class Task(NamedTuple): + time: float + action: Optional[Callable[[], Any]] + + def __eq__(self, other: "Task") -> Any: # type: ignore[override] + return self.time == other.time + + def __lt__(self, other: "Task") -> Any: # type: ignore[override] + return self.time < other.time + + def __le__(self, other: "Task") -> Any: # type: ignore[override] + return self.time <= other.time + + def __gt__(self, other: "Task") -> Any: # type: ignore[override] + return self.time > other.time + + def __ge__(self, other: "Task") -> Any: # type: ignore[override] + return self.time >= other.time diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index 90a7abb02..9bd1766db 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -22,9 +22,9 @@ from psycopg.pq import TransactionStatus from psycopg.rows import TupleRow from .base import ConnectionAttempt, BasePool -from .sched import AsyncScheduler from .errors import PoolClosed, PoolTimeout, TooManyRequests from ._compat import Deque +from .sched_async import AsyncScheduler logger = logging.getLogger("psycopg.pool") diff --git a/psycopg_pool/psycopg_pool/sched.py b/psycopg_pool/psycopg_pool/sched.py index ca2600732..2c6f3c0e8 100644 --- a/psycopg_pool/psycopg_pool/sched.py +++ b/psycopg_pool/psycopg_pool/sched.py @@ -1,3 +1,6 @@ +# WARNING: this file is auto-generated by 'async_to_sync.py' +# from the original file 'sched_async.py' +# DO NOT CHANGE! Change the original file instead. """ A minimal scheduler to schedule tasks run in the future. @@ -7,47 +10,29 @@ front of the one currently running and `Scheduler.run()` can be left running without any task scheduled. Tasks are called "Task", not "Event", here, because we actually make use of -`threading.Event` and the two would be confusing. +`[threading/asyncio].Event` and the two would be confusing. """ # Copyright (C) 2021 The Psycopg Team -import asyncio import logging -import threading from time import monotonic from heapq import heappush, heappop -from typing import Any, Callable, List, Optional, NamedTuple +from typing import Any, Callable, List, Optional -logger = logging.getLogger(__name__) - - -class Task(NamedTuple): - time: float - action: Optional[Callable[[], Any]] - - def __eq__(self, other: "Task") -> Any: # type: ignore[override] - return self.time == other.time +from threading import RLock as Lock, Event - def __lt__(self, other: "Task") -> Any: # type: ignore[override] - return self.time < other.time +from ._task import Task - def __le__(self, other: "Task") -> Any: # type: ignore[override] - return self.time <= other.time - - def __gt__(self, other: "Task") -> Any: # type: ignore[override] - return self.time > other.time - - def __ge__(self, other: "Task") -> Any: # type: ignore[override] - return self.time >= other.time +logger = logging.getLogger(__name__) class Scheduler: def __init__(self) -> None: """Initialize a new instance, passing the time and delay functions.""" self._queue: List[Task] = [] - self._lock = threading.RLock() - self._event = threading.Event() + self._lock = Lock() + self._event = Event() EMPTY_QUEUE_TIMEOUT = 600.0 @@ -106,72 +91,3 @@ class Scheduler: else: # Block for the expected timeout or until a new task scheduled self._event.wait(timeout=delay) - - -class AsyncScheduler: - def __init__(self) -> None: - """Initialize a new instance, passing the time and delay functions.""" - self._queue: List[Task] = [] - self._lock = asyncio.Lock() - self._event = asyncio.Event() - - EMPTY_QUEUE_TIMEOUT = 600.0 - - async def enter(self, delay: float, action: Optional[Callable[[], Any]]) -> Task: - """Enter a new task in the queue delayed in the future. - - Schedule a `!None` to stop the execution. - """ - time = monotonic() + delay - return await self.enterabs(time, action) - - async def enterabs(self, time: float, action: Optional[Callable[[], Any]]) -> Task: - """Enter a new task in the queue at an absolute time. - - Schedule a `!None` to stop the execution. - """ - task = Task(time, action) - async with self._lock: - heappush(self._queue, task) - first = self._queue[0] is task - - if first: - self._event.set() - - return task - - async def run(self) -> None: - """Execute the events scheduled.""" - q = self._queue - while True: - async with self._lock: - now = monotonic() - task = q[0] if q else None - if task: - if task.time <= now: - heappop(q) - else: - delay = task.time - now - task = None - else: - delay = self.EMPTY_QUEUE_TIMEOUT - self._event.clear() - - if task: - if not task.action: - break - try: - await task.action() - except Exception as e: - logger.warning( - "scheduled task run %s failed: %s: %s", - task.action, - e.__class__.__name__, - e, - ) - else: - # Block for the expected timeout or until a new task scheduled - try: - await asyncio.wait_for(self._event.wait(), delay) - except asyncio.TimeoutError: - pass diff --git a/psycopg_pool/psycopg_pool/sched_async.py b/psycopg_pool/psycopg_pool/sched_async.py new file mode 100644 index 000000000..fe9e443ff --- /dev/null +++ b/psycopg_pool/psycopg_pool/sched_async.py @@ -0,0 +1,100 @@ +""" +A minimal scheduler to schedule tasks run in the future. + +Inspired to the standard library `sched.scheduler`, but designed for +multi-thread usage ground up, not as an afterthought. Tasks can be scheduled in +front of the one currently running and `Scheduler.run()` can be left running +without any task scheduled. + +Tasks are called "Task", not "Event", here, because we actually make use of +`[threading/asyncio].Event` and the two would be confusing. +""" + +# Copyright (C) 2021 The Psycopg Team + +import logging +from time import monotonic +from heapq import heappush, heappop +from typing import Any, Callable, List, Optional + +if True: # ASYNC + from asyncio import Event, Lock, TimeoutError, wait_for +else: + from threading import RLock as Lock, Event + + +from ._task import Task + +logger = logging.getLogger(__name__) + + +class AsyncScheduler: + def __init__(self) -> None: + """Initialize a new instance, passing the time and delay functions.""" + self._queue: List[Task] = [] + self._lock = Lock() + self._event = Event() + + EMPTY_QUEUE_TIMEOUT = 600.0 + + async def enter(self, delay: float, action: Optional[Callable[[], Any]]) -> Task: + """Enter a new task in the queue delayed in the future. + + Schedule a `!None` to stop the execution. + """ + time = monotonic() + delay + return await self.enterabs(time, action) + + async def enterabs(self, time: float, action: Optional[Callable[[], Any]]) -> Task: + """Enter a new task in the queue at an absolute time. + + Schedule a `!None` to stop the execution. + """ + task = Task(time, action) + async with self._lock: + heappush(self._queue, task) + first = self._queue[0] is task + + if first: + self._event.set() + + return task + + async def run(self) -> None: + """Execute the events scheduled.""" + q = self._queue + while True: + async with self._lock: + now = monotonic() + task = q[0] if q else None + if task: + if task.time <= now: + heappop(q) + else: + delay = task.time - now + task = None + else: + delay = self.EMPTY_QUEUE_TIMEOUT + self._event.clear() + + if task: + if not task.action: + break + try: + await task.action() + except Exception as e: + logger.warning( + "scheduled task run %s failed: %s: %s", + task.action, + e.__class__.__name__, + e, + ) + else: + # Block for the expected timeout or until a new task scheduled + if True: # ASYNC + try: + await wait_for(self._event.wait(), delay) + except TimeoutError: + pass + else: + self._event.wait(timeout=delay) diff --git a/tests/pool/test_sched_async.py b/tests/pool/test_sched_async.py index 45c94a5eb..259c66383 100644 --- a/tests/pool/test_sched_async.py +++ b/tests/pool/test_sched_async.py @@ -7,7 +7,7 @@ from functools import partial import pytest try: - from psycopg_pool.sched import AsyncScheduler + from psycopg_pool.sched_async import AsyncScheduler except ImportError: # Tests should have been skipped if the package is not available pass diff --git a/tools/async_to_sync.py b/tools/async_to_sync.py index ff70ca3a4..ab2c4fb8e 100755 --- a/tools/async_to_sync.py +++ b/tools/async_to_sync.py @@ -165,6 +165,7 @@ class RenameAsyncToSync(ast.NodeTransformer): "AsyncQueuedLibpqWriter": "QueuedLibpqWriter", "AsyncRawCursor": "RawCursor", "AsyncRowFactory": "RowFactory", + "AsyncScheduler": "Scheduler", "AsyncServerCursor": "ServerCursor", "AsyncTransaction": "Transaction", "AsyncWriter": "Writer", diff --git a/tools/convert_async_to_sync.sh b/tools/convert_async_to_sync.sh index 21b763526..bea2963a8 100755 --- a/tools/convert_async_to_sync.sh +++ b/tools/convert_async_to_sync.sh @@ -20,6 +20,7 @@ outputs="" for async in \ psycopg/psycopg/connection_async.py \ psycopg/psycopg/cursor_async.py \ + psycopg_pool/psycopg_pool/sched_async.py \ tests/test_client_cursor_async.py \ tests/test_connection_async.py \ tests/test_copy_async.py \