if hasattr(selectors, "EpollSelector"):
_epoll_evmasks = {
- WAIT_R: select.EPOLLONESHOT | select.EPOLLIN | select.EPOLLERR,
- WAIT_W: select.EPOLLONESHOT | select.EPOLLOUT | select.EPOLLERR,
- WAIT_RW: select.EPOLLONESHOT
- | (select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR),
+ WAIT_R: select.EPOLLONESHOT | select.EPOLLIN,
+ WAIT_W: select.EPOLLONESHOT | select.EPOLLOUT,
+ WAIT_RW: select.EPOLLONESHOT | select.EPOLLIN | select.EPOLLOUT,
}
else:
_epoll_evmasks = {}
continue
ev = fileevs[0][1]
ready = 0
- if ev & ~select.EPOLLOUT:
+ if ev & select.EPOLLIN:
ready = READY_R
- if ev & ~select.EPOLLIN:
+ if ev & select.EPOLLOUT:
ready |= READY_W
s = gen.send(ready)
evmask = _epoll_evmasks[s]
ev = fileevs[0][1]
ready = 0
- if ev & ~select.POLLOUT:
+ if ev & select.POLLIN:
ready = READY_R
- if ev & ~select.POLLIN:
+ if ev & select.POLLOUT:
ready |= READY_W
s = gen.send(ready)
evmask = _poll_evmasks[s]
import time
import select # noqa: used in pytest.mark.skipif
import socket
+from threading import Event
import pytest
from psycopg.pq import ConnStatus, ExecStatus
from psycopg.conninfo import make_conninfo
+from .acompat import gather, spawn
+
skip_if_not_linux = pytest.mark.skipif(
not sys.platform.startswith("linux"), reason="non-Linux platform"
)
waiting.wait_conn(gen)
+@pytest.mark.slow
+@pytest.mark.parametrize("waitfn", waitfns)
+@skip_if_not_linux
+def test_wait_r(waitfn):
+ waitfn = getattr(waiting, waitfn)
+
+ port = None
+ ev = Event()
+
+ def writer():
+ ev.wait()
+ assert port
+ time.sleep(0.2)
+ with socket.create_connection(("127.0.0.1", port)):
+ pass
+
+ t = spawn(writer)
+
+ def gen():
+ r = yield waiting.Wait.R
+ return r
+
+ with socket.socket() as s:
+ s.bind(("", 0))
+ port = s.getsockname()[1]
+ s.listen()
+ ev.set()
+ t0 = time.time()
+ r = waitfn(gen(), s.fileno(), 0.3)
+ tf = time.time()
+ assert r == waiting.Ready.R
+ assert 0.2 <= tf - t0 < 0.3
+
+ gather(t)
+
+
@pytest.mark.parametrize("waitfn", waitfns)
@pytest.mark.parametrize("event", events)
@skip_if_not_linux