]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
refactor(pool): generate Scheduler from AsyncScheduler
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 11 Sep 2023 16:10:17 +0000 (17:10 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 11 Oct 2023 21:45:38 +0000 (23:45 +0200)
psycopg_pool/psycopg_pool/_task.py [new file with mode: 0644]
psycopg_pool/psycopg_pool/pool_async.py
psycopg_pool/psycopg_pool/sched.py
psycopg_pool/psycopg_pool/sched_async.py [new file with mode: 0644]
tests/pool/test_sched_async.py
tools/async_to_sync.py
tools/convert_async_to_sync.sh

diff --git a/psycopg_pool/psycopg_pool/_task.py b/psycopg_pool/psycopg_pool/_task.py
new file mode 100644 (file)
index 0000000..4197650
--- /dev/null
@@ -0,0 +1,27 @@
+"""
+Task for Scheduler and AsyncScheduler
+"""
+
+# Copyright (C) 2023 The Psycopg Team
+
+from typing import Any, Callable, Optional, NamedTuple
+
+
+class Task(NamedTuple):
+    time: float
+    action: Optional[Callable[[], Any]]
+
+    def __eq__(self, other: "Task") -> Any:  # type: ignore[override]
+        return self.time == other.time
+
+    def __lt__(self, other: "Task") -> Any:  # type: ignore[override]
+        return self.time < other.time
+
+    def __le__(self, other: "Task") -> Any:  # type: ignore[override]
+        return self.time <= other.time
+
+    def __gt__(self, other: "Task") -> Any:  # type: ignore[override]
+        return self.time > other.time
+
+    def __ge__(self, other: "Task") -> Any:  # type: ignore[override]
+        return self.time >= other.time
index 90a7abb02f8e46bcafc2104558d2a2fb8ff1bbd5..9bd1766dbfb5e787c788e3e8da4e1a756f6493f6 100644 (file)
@@ -22,9 +22,9 @@ from psycopg.pq import TransactionStatus
 from psycopg.rows import TupleRow
 
 from .base import ConnectionAttempt, BasePool
-from .sched import AsyncScheduler
 from .errors import PoolClosed, PoolTimeout, TooManyRequests
 from ._compat import Deque
+from .sched_async import AsyncScheduler
 
 logger = logging.getLogger("psycopg.pool")
 
index ca26007324a50a6267f33f28bcc1b3fb0defcbbf..2c6f3c0e85314aa6aa74fbb9a7f213b1bb2c06ba 100644 (file)
@@ -1,3 +1,6 @@
+# WARNING: this file is auto-generated by 'async_to_sync.py'
+# from the original file 'sched_async.py'
+# DO NOT CHANGE! Change the original file instead.
 """
 A minimal scheduler to schedule tasks run in the future.
 
@@ -7,47 +10,29 @@ front of the one currently running and `Scheduler.run()` can be left running
 without any task scheduled.
 
 Tasks are called "Task", not "Event", here, because we actually make use of
-`threading.Event` and the two would be confusing.
+`[threading/asyncio].Event` and the two would be confusing.
 """
 
 # Copyright (C) 2021 The Psycopg Team
 
-import asyncio
 import logging
-import threading
 from time import monotonic
 from heapq import heappush, heappop
-from typing import Any, Callable, List, Optional, NamedTuple
+from typing import Any, Callable, List, Optional
 
-logger = logging.getLogger(__name__)
-
-
-class Task(NamedTuple):
-    time: float
-    action: Optional[Callable[[], Any]]
-
-    def __eq__(self, other: "Task") -> Any:  # type: ignore[override]
-        return self.time == other.time
+from threading import RLock as Lock, Event
 
-    def __lt__(self, other: "Task") -> Any:  # type: ignore[override]
-        return self.time < other.time
+from ._task import Task
 
-    def __le__(self, other: "Task") -> Any:  # type: ignore[override]
-        return self.time <= other.time
-
-    def __gt__(self, other: "Task") -> Any:  # type: ignore[override]
-        return self.time > other.time
-
-    def __ge__(self, other: "Task") -> Any:  # type: ignore[override]
-        return self.time >= other.time
+logger = logging.getLogger(__name__)
 
 
 class Scheduler:
     def __init__(self) -> None:
         """Initialize a new instance, passing the time and delay functions."""
         self._queue: List[Task] = []
-        self._lock = threading.RLock()
-        self._event = threading.Event()
+        self._lock = Lock()
+        self._event = Event()
 
     EMPTY_QUEUE_TIMEOUT = 600.0
 
@@ -106,72 +91,3 @@ class Scheduler:
             else:
                 # Block for the expected timeout or until a new task scheduled
                 self._event.wait(timeout=delay)
-
-
-class AsyncScheduler:
-    def __init__(self) -> None:
-        """Initialize a new instance, passing the time and delay functions."""
-        self._queue: List[Task] = []
-        self._lock = asyncio.Lock()
-        self._event = asyncio.Event()
-
-    EMPTY_QUEUE_TIMEOUT = 600.0
-
-    async def enter(self, delay: float, action: Optional[Callable[[], Any]]) -> Task:
-        """Enter a new task in the queue delayed in the future.
-
-        Schedule a `!None` to stop the execution.
-        """
-        time = monotonic() + delay
-        return await self.enterabs(time, action)
-
-    async def enterabs(self, time: float, action: Optional[Callable[[], Any]]) -> Task:
-        """Enter a new task in the queue at an absolute time.
-
-        Schedule a `!None` to stop the execution.
-        """
-        task = Task(time, action)
-        async with self._lock:
-            heappush(self._queue, task)
-            first = self._queue[0] is task
-
-        if first:
-            self._event.set()
-
-        return task
-
-    async def run(self) -> None:
-        """Execute the events scheduled."""
-        q = self._queue
-        while True:
-            async with self._lock:
-                now = monotonic()
-                task = q[0] if q else None
-                if task:
-                    if task.time <= now:
-                        heappop(q)
-                    else:
-                        delay = task.time - now
-                        task = None
-                else:
-                    delay = self.EMPTY_QUEUE_TIMEOUT
-                self._event.clear()
-
-            if task:
-                if not task.action:
-                    break
-                try:
-                    await task.action()
-                except Exception as e:
-                    logger.warning(
-                        "scheduled task run %s failed: %s: %s",
-                        task.action,
-                        e.__class__.__name__,
-                        e,
-                    )
-            else:
-                # Block for the expected timeout or until a new task scheduled
-                try:
-                    await asyncio.wait_for(self._event.wait(), delay)
-                except asyncio.TimeoutError:
-                    pass
diff --git a/psycopg_pool/psycopg_pool/sched_async.py b/psycopg_pool/psycopg_pool/sched_async.py
new file mode 100644 (file)
index 0000000..fe9e443
--- /dev/null
@@ -0,0 +1,100 @@
+"""
+A minimal scheduler to schedule tasks run in the future.
+
+Inspired to the standard library `sched.scheduler`, but designed for
+multi-thread usage ground up, not as an afterthought. Tasks can be scheduled in
+front of the one currently running and `Scheduler.run()` can be left running
+without any task scheduled.
+
+Tasks are called "Task", not "Event", here, because we actually make use of
+`[threading/asyncio].Event` and the two would be confusing.
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+import logging
+from time import monotonic
+from heapq import heappush, heappop
+from typing import Any, Callable, List, Optional
+
+if True:  # ASYNC
+    from asyncio import Event, Lock, TimeoutError, wait_for
+else:
+    from threading import RLock as Lock, Event
+
+
+from ._task import Task
+
+logger = logging.getLogger(__name__)
+
+
+class AsyncScheduler:
+    def __init__(self) -> None:
+        """Initialize a new instance, passing the time and delay functions."""
+        self._queue: List[Task] = []
+        self._lock = Lock()
+        self._event = Event()
+
+    EMPTY_QUEUE_TIMEOUT = 600.0
+
+    async def enter(self, delay: float, action: Optional[Callable[[], Any]]) -> Task:
+        """Enter a new task in the queue delayed in the future.
+
+        Schedule a `!None` to stop the execution.
+        """
+        time = monotonic() + delay
+        return await self.enterabs(time, action)
+
+    async def enterabs(self, time: float, action: Optional[Callable[[], Any]]) -> Task:
+        """Enter a new task in the queue at an absolute time.
+
+        Schedule a `!None` to stop the execution.
+        """
+        task = Task(time, action)
+        async with self._lock:
+            heappush(self._queue, task)
+            first = self._queue[0] is task
+
+        if first:
+            self._event.set()
+
+        return task
+
+    async def run(self) -> None:
+        """Execute the events scheduled."""
+        q = self._queue
+        while True:
+            async with self._lock:
+                now = monotonic()
+                task = q[0] if q else None
+                if task:
+                    if task.time <= now:
+                        heappop(q)
+                    else:
+                        delay = task.time - now
+                        task = None
+                else:
+                    delay = self.EMPTY_QUEUE_TIMEOUT
+                self._event.clear()
+
+            if task:
+                if not task.action:
+                    break
+                try:
+                    await task.action()
+                except Exception as e:
+                    logger.warning(
+                        "scheduled task run %s failed: %s: %s",
+                        task.action,
+                        e.__class__.__name__,
+                        e,
+                    )
+            else:
+                # Block for the expected timeout or until a new task scheduled
+                if True:  # ASYNC
+                    try:
+                        await wait_for(self._event.wait(), delay)
+                    except TimeoutError:
+                        pass
+                else:
+                    self._event.wait(timeout=delay)
index 45c94a5eb41171f125cfb0c82fb41c83dd9e619a..259c66383bfa36911a0e18d8df9bcee4ec367ba1 100644 (file)
@@ -7,7 +7,7 @@ from functools import partial
 import pytest
 
 try:
-    from psycopg_pool.sched import AsyncScheduler
+    from psycopg_pool.sched_async import AsyncScheduler
 except ImportError:
     # Tests should have been skipped if the package is not available
     pass
index ff70ca3a439cef413b464ccb4cd9e95b729833cd..ab2c4fb8e9cf52046a4ea93a92e96c38c1fc9073 100755 (executable)
@@ -165,6 +165,7 @@ class RenameAsyncToSync(ast.NodeTransformer):
         "AsyncQueuedLibpqWriter": "QueuedLibpqWriter",
         "AsyncRawCursor": "RawCursor",
         "AsyncRowFactory": "RowFactory",
+        "AsyncScheduler": "Scheduler",
         "AsyncServerCursor": "ServerCursor",
         "AsyncTransaction": "Transaction",
         "AsyncWriter": "Writer",
index 21b76352658d806e747d51796aa96d5e58230095..bea2963a86978f46f30d8f558e0af1a0d57cf92c 100755 (executable)
@@ -20,6 +20,7 @@ outputs=""
 for async in \
     psycopg/psycopg/connection_async.py \
     psycopg/psycopg/cursor_async.py \
+    psycopg_pool/psycopg_pool/sched_async.py \
     tests/test_client_cursor_async.py \
     tests/test_connection_async.py \
     tests/test_copy_async.py \