from . import exceptions as exc
from . import cursor
from .conninfo import make_conninfo
-from .waiting import wait_select, wait_async, Wait, Ready
+from .waiting import wait, wait_async, Wait, Ready
logger = logging.getLogger(__name__)
break
ready = yield pgconn.socket, Wait.RW
- if ready is Ready.R:
+ if ready & Ready.R:
pgconn.consume_input()
continue
)
@classmethod
- def wait(cls, gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV:
- return wait_select(gen)
+ def wait(
+ cls,
+ gen: Generator[Tuple[int, Wait], Ready, RV],
+ timeout: Optional[float] = 0.1,
+ ) -> RV:
+ return wait(gen, timeout=timeout)
class AsyncConnection(BaseConnection):
# Copyright (C) 2020 The Psycopg Team
-from enum import Enum
-from select import select
-from typing import Generator, Tuple, TypeVar
+from enum import IntEnum
+from typing import Generator, Optional, Tuple, TypeVar
from asyncio import get_event_loop, Event
+from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from . import exceptions as exc
-Wait = Enum("Wait", "R W RW")
-Ready = Enum("Ready", "R W")
+class Wait(IntEnum):
+ R = EVENT_READ
+ W = EVENT_WRITE
+ RW = EVENT_READ | EVENT_WRITE
-RV = TypeVar("RV")
+class Ready(IntEnum):
+ R = EVENT_READ
+ W = EVENT_WRITE
-def wait_select(gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV:
- """
- Wait on the behalf of a generator using select().
- *gen* is expected to generate tuples (fd, status). consume it and block
- according to the status until fd is ready. Send back the ready state
- to the generator.
+RV = TypeVar("RV")
- Return what the generator eventually returned.
+
+def wait(
+ gen: Generator[Tuple[int, Wait], Ready, RV],
+ timeout: Optional[float] = None,
+) -> RV:
+ """
+ Wait for a generator using the best option available on the platform.
+
+ :param gen: a generator performing database operations and yielding
+ (fd, `Ready`) pairs when it would block.
+ :param timeout: timeout (in seconds) to check for other interrupt, e.g.
+ to allow Ctrl-C.
+ :type timeout: float
+ :return: whatever *gen* returns on completion.
"""
+ sel = DefaultSelector()
try:
+ fd, s = next(gen)
while 1:
- fd, s = next(gen)
- if s is Wait.R:
- rf, wf, xf = select([fd], [], [])
- assert rf
- gen.send(Ready.R)
- elif s is Wait.W:
- rf, wf, xf = select([], [fd], [])
- assert wf
- gen.send(Ready.W)
- elif s is Wait.RW:
- rf, wf, xf = select([fd], [fd], [])
- assert rf or wf
- assert not (rf and wf)
- if rf:
- gen.send(Ready.R)
- else:
- gen.send(Ready.W)
- else:
- raise exc.InternalError("bad poll status: %s")
+ sel.register(fd, s)
+ ready = None
+ while not ready:
+ ready = sel.select(timeout=timeout)
+ sel.unregister(fd)
+
+ assert len(ready) == 1
+ fd, s = gen.send(ready[0][1])
+
except StopIteration as e:
rv: RV = e.args[0]
return rv
ev.set()
try:
+ fd, s = next(gen)
while 1:
- fd, s = next(gen)
ev.clear()
if s is Wait.R:
loop.add_reader(fd, wakeup, Ready.R)
await ev.wait()
loop.remove_reader(fd)
- gen.send(ready)
elif s is Wait.W:
loop.add_writer(fd, wakeup, Ready.W)
await ev.wait()
loop.remove_writer(fd)
- gen.send(ready)
elif s is Wait.RW:
loop.add_reader(fd, wakeup, Ready.R)
loop.add_writer(fd, wakeup, Ready.W)
await ev.wait()
loop.remove_reader(fd)
loop.remove_writer(fd)
- gen.send(ready)
else:
raise exc.InternalError("bad poll status: %s")
+ fd, s = gen.send(ready)
+
except StopIteration as e:
rv: RV = e.args[0]
return rv