import sys
import select
import selectors
-from typing import Dict, Optional
+from typing import Optional
from asyncio import get_event_loop, wait_for, Event, TimeoutError
from selectors import DefaultSelector
def wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
"""
Wait for a generator using select where supported.
+
+ BUG: on Linux, can't select on FD >= 1024. On Windows it's fine.
"""
try:
s = next(gen)
return rv
-poll_evmasks: Dict[Wait, int]
-
if hasattr(selectors, "EpollSelector"):
- poll_evmasks = {
- WAIT_R: select.EPOLLONESHOT | select.EPOLLIN,
- WAIT_W: select.EPOLLONESHOT | select.EPOLLOUT,
- WAIT_RW: select.EPOLLONESHOT | select.EPOLLIN | select.EPOLLOUT,
+ _epoll_evmasks = {
+ WAIT_R: select.EPOLLONESHOT | select.EPOLLIN | select.EPOLLERR,
+ WAIT_W: select.EPOLLONESHOT | select.EPOLLOUT | select.EPOLLERR,
+ WAIT_RW: select.EPOLLONESHOT
+ | (select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR),
}
else:
- poll_evmasks = {}
+ _epoll_evmasks = {}
def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
strategy is `epoll` then this function will be used instead of `wait`.
See also: https://linux.die.net/man/2/epoll_ctl
+
+ BUG: if the connection FD is closed, `epoll.poll()` hangs. Same for
+ EpollSelector. For this reason, wait_poll() is currently preferable.
+ To reproduce the bug:
+
+ export PSYCOPG_WAIT_FUNC=wait_epoll
+ pytest tests/test_concurrency.py::test_concurrent_close
"""
try:
s = next(gen)
timeout = int(timeout * 1000.0)
with select.epoll() as epoll:
- evmask = poll_evmasks[s]
+ evmask = _epoll_evmasks[s]
epoll.register(fileno, evmask)
while True:
fileevs = None
ready |= READY_W
# assert s & ready
s = gen.send(ready)
- evmask = poll_evmasks[s]
+ evmask = _epoll_evmasks[s]
epoll.modify(fileno, evmask)
except StopIteration as ex:
return rv
+if hasattr(selectors, "PollSelector"):
+ _poll_evmasks = {
+ WAIT_R: select.POLLIN,
+ WAIT_W: select.POLLOUT,
+ WAIT_RW: select.POLLIN | select.POLLOUT,
+ }
+else:
+ _poll_evmasks = {}
+
+
+def wait_poll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
+ """
+ Wait for a generator using poll where supported.
+
+ Parameters are like for `wait()`.
+ """
+ try:
+ s = next(gen)
+
+ if timeout is None or timeout < 0:
+ timeout = 0
+ else:
+ timeout = int(timeout * 1000.0)
+
+ poll = select.poll()
+ evmask = _poll_evmasks[s]
+ poll.register(fileno, evmask)
+ while True:
+ fileevs = None
+ while not fileevs:
+ fileevs = poll.poll(timeout)
+ ev = fileevs[0][1]
+ ready = 0
+ if ev & ~select.POLLOUT:
+ ready = READY_R
+ if ev & ~select.POLLIN:
+ ready |= READY_W
+ # assert s & ready
+ s = gen.send(ready)
+ evmask = _poll_evmasks[s]
+ poll.modify(fileno, evmask)
+
+ except StopIteration as ex:
+ rv: RV = ex.args[0] if ex.args else None
+ return rv
+
+
if _psycopg:
wait_c = _psycopg.wait_c
# On Windows, SelectSelector should be the default.
wait = wait_select
-elif selectors.DefaultSelector is getattr(selectors, "EpollSelector", None):
- wait = wait_epoll
+elif hasattr(selectors, "PollSelector"):
+ # On linux, EpollSelector is the default. However, it hangs if the fd is
+ # closed while polling.
+ wait = wait_poll
else:
wait = wait_selector