--- /dev/null
+"""
+Task for Scheduler and AsyncScheduler
+"""
+
+# Copyright (C) 2023 The Psycopg Team
+
+from typing import Any, Callable, Optional, NamedTuple
+
+
+class Task(NamedTuple):
+ time: float
+ action: Optional[Callable[[], Any]]
+
+ def __eq__(self, other: "Task") -> Any: # type: ignore[override]
+ return self.time == other.time
+
+ def __lt__(self, other: "Task") -> Any: # type: ignore[override]
+ return self.time < other.time
+
+ def __le__(self, other: "Task") -> Any: # type: ignore[override]
+ return self.time <= other.time
+
+ def __gt__(self, other: "Task") -> Any: # type: ignore[override]
+ return self.time > other.time
+
+ def __ge__(self, other: "Task") -> Any: # type: ignore[override]
+ return self.time >= other.time
from psycopg.rows import TupleRow
from .base import ConnectionAttempt, BasePool
-from .sched import AsyncScheduler
from .errors import PoolClosed, PoolTimeout, TooManyRequests
from ._compat import Deque
+from .sched_async import AsyncScheduler
logger = logging.getLogger("psycopg.pool")
+# WARNING: this file is auto-generated by 'async_to_sync.py'
+# from the original file 'sched_async.py'
+# DO NOT CHANGE! Change the original file instead.
"""
A minimal scheduler to schedule tasks run in the future.
without any task scheduled.
Tasks are called "Task", not "Event", here, because we actually make use of
-`threading.Event` and the two would be confusing.
+`[threading/asyncio].Event` and the two would be confusing.
"""
# Copyright (C) 2021 The Psycopg Team
-import asyncio
import logging
-import threading
from time import monotonic
from heapq import heappush, heappop
-from typing import Any, Callable, List, Optional, NamedTuple
+from typing import Any, Callable, List, Optional
-logger = logging.getLogger(__name__)
-
-
-class Task(NamedTuple):
- time: float
- action: Optional[Callable[[], Any]]
-
- def __eq__(self, other: "Task") -> Any: # type: ignore[override]
- return self.time == other.time
+from threading import RLock as Lock, Event
- def __lt__(self, other: "Task") -> Any: # type: ignore[override]
- return self.time < other.time
+from ._task import Task
- def __le__(self, other: "Task") -> Any: # type: ignore[override]
- return self.time <= other.time
-
- def __gt__(self, other: "Task") -> Any: # type: ignore[override]
- return self.time > other.time
-
- def __ge__(self, other: "Task") -> Any: # type: ignore[override]
- return self.time >= other.time
+logger = logging.getLogger(__name__)
class Scheduler:
def __init__(self) -> None:
"""Initialize a new instance, passing the time and delay functions."""
self._queue: List[Task] = []
- self._lock = threading.RLock()
- self._event = threading.Event()
+ self._lock = Lock()
+ self._event = Event()
EMPTY_QUEUE_TIMEOUT = 600.0
else:
# Block for the expected timeout or until a new task scheduled
self._event.wait(timeout=delay)
-
-
-class AsyncScheduler:
- def __init__(self) -> None:
- """Initialize a new instance, passing the time and delay functions."""
- self._queue: List[Task] = []
- self._lock = asyncio.Lock()
- self._event = asyncio.Event()
-
- EMPTY_QUEUE_TIMEOUT = 600.0
-
- async def enter(self, delay: float, action: Optional[Callable[[], Any]]) -> Task:
- """Enter a new task in the queue delayed in the future.
-
- Schedule a `!None` to stop the execution.
- """
- time = monotonic() + delay
- return await self.enterabs(time, action)
-
- async def enterabs(self, time: float, action: Optional[Callable[[], Any]]) -> Task:
- """Enter a new task in the queue at an absolute time.
-
- Schedule a `!None` to stop the execution.
- """
- task = Task(time, action)
- async with self._lock:
- heappush(self._queue, task)
- first = self._queue[0] is task
-
- if first:
- self._event.set()
-
- return task
-
- async def run(self) -> None:
- """Execute the events scheduled."""
- q = self._queue
- while True:
- async with self._lock:
- now = monotonic()
- task = q[0] if q else None
- if task:
- if task.time <= now:
- heappop(q)
- else:
- delay = task.time - now
- task = None
- else:
- delay = self.EMPTY_QUEUE_TIMEOUT
- self._event.clear()
-
- if task:
- if not task.action:
- break
- try:
- await task.action()
- except Exception as e:
- logger.warning(
- "scheduled task run %s failed: %s: %s",
- task.action,
- e.__class__.__name__,
- e,
- )
- else:
- # Block for the expected timeout or until a new task scheduled
- try:
- await asyncio.wait_for(self._event.wait(), delay)
- except asyncio.TimeoutError:
- pass
--- /dev/null
+"""
+A minimal scheduler to schedule tasks run in the future.
+
+Inspired to the standard library `sched.scheduler`, but designed for
+multi-thread usage ground up, not as an afterthought. Tasks can be scheduled in
+front of the one currently running and `Scheduler.run()` can be left running
+without any task scheduled.
+
+Tasks are called "Task", not "Event", here, because we actually make use of
+`[threading/asyncio].Event` and the two would be confusing.
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+import logging
+from time import monotonic
+from heapq import heappush, heappop
+from typing import Any, Callable, List, Optional
+
+if True: # ASYNC
+ from asyncio import Event, Lock, TimeoutError, wait_for
+else:
+ from threading import RLock as Lock, Event
+
+
+from ._task import Task
+
+logger = logging.getLogger(__name__)
+
+
+class AsyncScheduler:
+ def __init__(self) -> None:
+ """Initialize a new instance, passing the time and delay functions."""
+ self._queue: List[Task] = []
+ self._lock = Lock()
+ self._event = Event()
+
+ EMPTY_QUEUE_TIMEOUT = 600.0
+
+ async def enter(self, delay: float, action: Optional[Callable[[], Any]]) -> Task:
+ """Enter a new task in the queue delayed in the future.
+
+ Schedule a `!None` to stop the execution.
+ """
+ time = monotonic() + delay
+ return await self.enterabs(time, action)
+
+ async def enterabs(self, time: float, action: Optional[Callable[[], Any]]) -> Task:
+ """Enter a new task in the queue at an absolute time.
+
+ Schedule a `!None` to stop the execution.
+ """
+ task = Task(time, action)
+ async with self._lock:
+ heappush(self._queue, task)
+ first = self._queue[0] is task
+
+ if first:
+ self._event.set()
+
+ return task
+
+ async def run(self) -> None:
+ """Execute the events scheduled."""
+ q = self._queue
+ while True:
+ async with self._lock:
+ now = monotonic()
+ task = q[0] if q else None
+ if task:
+ if task.time <= now:
+ heappop(q)
+ else:
+ delay = task.time - now
+ task = None
+ else:
+ delay = self.EMPTY_QUEUE_TIMEOUT
+ self._event.clear()
+
+ if task:
+ if not task.action:
+ break
+ try:
+ await task.action()
+ except Exception as e:
+ logger.warning(
+ "scheduled task run %s failed: %s: %s",
+ task.action,
+ e.__class__.__name__,
+ e,
+ )
+ else:
+ # Block for the expected timeout or until a new task scheduled
+ if True: # ASYNC
+ try:
+ await wait_for(self._event.wait(), delay)
+ except TimeoutError:
+ pass
+ else:
+ self._event.wait(timeout=delay)
import pytest
try:
- from psycopg_pool.sched import AsyncScheduler
+ from psycopg_pool.sched_async import AsyncScheduler
except ImportError:
# Tests should have been skipped if the package is not available
pass
"AsyncQueuedLibpqWriter": "QueuedLibpqWriter",
"AsyncRawCursor": "RawCursor",
"AsyncRowFactory": "RowFactory",
+ "AsyncScheduler": "Scheduler",
"AsyncServerCursor": "ServerCursor",
"AsyncTransaction": "Transaction",
"AsyncWriter": "Writer",
for async in \
psycopg/psycopg/connection_async.py \
psycopg/psycopg/cursor_async.py \
+ psycopg_pool/psycopg_pool/sched_async.py \
tests/test_client_cursor_async.py \
tests/test_connection_async.py \
tests/test_copy_async.py \