import select
import selectors
from enum import IntEnum
-from typing import Optional
+from typing import Dict, Optional
from asyncio import get_event_loop, wait_for, Event, TimeoutError
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
return rv
+def wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
+ """
+ Wait for a generator using select where supported.
+ """
+ try:
+ s = next(gen)
+
+ empty = ()
+ fnlist = (fileno,)
+ while True:
+ rl, wl, xl = select.select(
+ fnlist if s & WAIT_R else empty,
+ fnlist if s & WAIT_W else empty,
+ fnlist,
+ timeout,
+ )
+ ready = 0
+ if rl:
+ ready = READY_R
+ if wl:
+ ready |= READY_W
+ if not ready:
+ continue
+ assert s & ready
+ s = gen.send(ready) # type: ignore
+
+ except StopIteration as ex:
+ rv: RV = ex.args[0] if ex.args else None
+ return rv
+
+
+poll_evmasks: Dict[Wait, int]
+
+if hasattr(selectors, "EpollSelector"):
+ poll_evmasks = {
+ WAIT_R: select.EPOLLONESHOT | select.EPOLLIN,
+ WAIT_W: select.EPOLLONESHOT | select.EPOLLOUT,
+ WAIT_RW: select.EPOLLONESHOT | select.EPOLLIN | select.EPOLLOUT,
+ }
+else:
+ poll_evmasks = {}
+
+
def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
"""
Wait for a generator using epoll where supported.
if selectors.DefaultSelector is getattr(selectors, "EpollSelector", None):
- wait = wait_epoll
-
- poll_evmasks = {
- Wait.R: select.EPOLLONESHOT | select.EPOLLIN,
- Wait.W: select.EPOLLONESHOT | select.EPOLLOUT,
- Wait.RW: select.EPOLLONESHOT | select.EPOLLIN | select.EPOLLOUT,
- }
+ # NOTE: select seems more performing than epoll. It is admittedly unlikely
+ # that a platform has epoll but not select, so maybe we could kill
+ # wait_epoll altogether(). More testing to do.
+ if hasattr(selectors, "SelectSelector"):
+ wait = wait_select
+ else:
+ wait = wait_epoll
else:
wait = wait_selector