"""
def __call__(
- self, gen: PQGen[RV], fileno: int, timeout: Optional[float] = None
+ self, gen: PQGen[RV], fileno: int, interval: Optional[float] = None
) -> RV: ...
try:
conninfo = make_conninfo("", **attempt)
gen = cls._connect_gen(conninfo, timeout=timeout)
- rv = waiting.wait_conn(gen, timeout=_WAIT_INTERVAL)
+ rv = waiting.wait_conn(gen, interval=_WAIT_INTERVAL)
except e._NO_TRACEBACK as ex:
if len(attempts) > 1:
logger.debug(
# into shorter interval.
if timeout is not None:
deadline = monotonic() + timeout
- timeout = min(timeout, _WAIT_INTERVAL)
+ interval = min(timeout, _WAIT_INTERVAL)
else:
deadline = None
- timeout = _WAIT_INTERVAL
+ interval = _WAIT_INTERVAL
nreceived = 0
# notification is received to makes sure that they are consistent.
try:
with self.lock:
- ns = self.wait(notifies(self.pgconn), timeout=timeout)
+ ns = self.wait(notifies(self.pgconn), interval=interval)
if ns:
enc = pgconn_encoding(self.pgconn)
except e._NO_TRACEBACK as ex:
# Check the deadline after the loop to ensure that timeout=0
# polls at least once.
if deadline:
- timeout = min(_WAIT_INTERVAL, deadline - monotonic())
- if timeout < 0.0:
+ interval = min(_WAIT_INTERVAL, deadline - monotonic())
+ if interval < 0.0:
break
@contextmanager
assert pipeline is self._pipeline
self._pipeline = None
- def wait(self, gen: PQGen[RV], timeout: Optional[float] = _WAIT_INTERVAL) -> RV:
+ def wait(self, gen: PQGen[RV], interval: Optional[float] = _WAIT_INTERVAL) -> RV:
"""
Consume a generator operating on the connection.
fd (i.e. not on connect and reset).
"""
try:
- return waiting.wait(gen, self.pgconn.socket, timeout=timeout)
+ return waiting.wait(gen, self.pgconn.socket, interval=interval)
except _INTERRUPTED:
if self.pgconn.transaction_status == ACTIVE:
# On Ctrl-C, try to cancel the query in the server, otherwise
# the connection will remain stuck in ACTIVE state.
self._try_cancel(self.pgconn)
try:
- waiting.wait(gen, self.pgconn.socket, timeout=timeout)
+ waiting.wait(gen, self.pgconn.socket, interval=interval)
except e.QueryCanceled:
pass # as expected
raise
try:
conninfo = make_conninfo("", **attempt)
gen = cls._connect_gen(conninfo, timeout=timeout)
- rv = await waiting.wait_conn_async(gen, timeout=_WAIT_INTERVAL)
+ rv = await waiting.wait_conn_async(gen, interval=_WAIT_INTERVAL)
except e._NO_TRACEBACK as ex:
if len(attempts) > 1:
logger.debug(
# into shorter interval.
if timeout is not None:
deadline = monotonic() + timeout
- timeout = min(timeout, _WAIT_INTERVAL)
+ interval = min(timeout, _WAIT_INTERVAL)
else:
deadline = None
- timeout = _WAIT_INTERVAL
+ interval = _WAIT_INTERVAL
nreceived = 0
# notification is received to makes sure that they are consistent.
try:
async with self.lock:
- ns = await self.wait(notifies(self.pgconn), timeout=timeout)
+ ns = await self.wait(notifies(self.pgconn), interval=interval)
if ns:
enc = pgconn_encoding(self.pgconn)
except e._NO_TRACEBACK as ex:
# Check the deadline after the loop to ensure that timeout=0
# polls at least once.
if deadline:
- timeout = min(_WAIT_INTERVAL, deadline - monotonic())
- if timeout < 0.0:
+ interval = min(_WAIT_INTERVAL, deadline - monotonic())
+ if interval < 0.0:
break
@asynccontextmanager
self._pipeline = None
async def wait(
- self, gen: PQGen[RV], timeout: Optional[float] = _WAIT_INTERVAL
+ self, gen: PQGen[RV], interval: Optional[float] = _WAIT_INTERVAL
) -> RV:
"""
Consume a generator operating on the connection.
fd (i.e. not on connect and reset).
"""
try:
- return await waiting.wait_async(gen, self.pgconn.socket, timeout=timeout)
+ return await waiting.wait_async(gen, self.pgconn.socket, interval=interval)
except _INTERRUPTED:
if self.pgconn.transaction_status == ACTIVE:
# On Ctrl-C, try to cancel the query in the server, otherwise
# the connection will remain stuck in ACTIVE state.
self._try_cancel(self.pgconn)
try:
- await waiting.wait_async(gen, self.pgconn.socket, timeout=timeout)
+ await waiting.wait_async(gen, self.pgconn.socket, interval=interval)
except e.QueryCanceled:
pass # as expected
raise
logger = logging.getLogger(__name__)
-def wait_selector(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
+def wait_selector(gen: PQGen[RV], fileno: int, interval: Optional[float] = None) -> RV:
"""
Wait for a generator using the best strategy available.
:param gen: a generator performing database operations and yielding
`Ready` values when it would block.
:param fileno: the file descriptor to wait on.
- :param timeout: timeout (in seconds) to check for other interrupt, e.g.
- to allow Ctrl-C.
- :type timeout: float
+ :param interval: interval (in seconds) to check for other interrupt, e.g.
+ to allow Ctrl-C. If zero or None, wait indefinitely.
:return: whatever `!gen` returns on completion.
Consume `!gen`, scheduling `fileno` for completion when it is reported to
with DefaultSelector() as sel:
sel.register(fileno, s)
while True:
- rlist = sel.select(timeout=timeout)
+ rlist = sel.select(timeout=interval)
if not rlist:
gen.send(READY_NONE)
continue
return rv
-def wait_conn(gen: PQGenConn[RV], timeout: Optional[float] = None) -> RV:
+def wait_conn(gen: PQGenConn[RV], interval: Optional[float] = None) -> RV:
"""
Wait for a connection generator using the best strategy available.
:param gen: a generator performing database operations and yielding
(fd, `Ready`) pairs when it would block.
- :param timeout: timeout (in seconds) to check for other interrupt, e.g.
+ :param interval: interval (in seconds) to check for other interrupt, e.g.
to allow Ctrl-C. If zero or None, wait indefinitely.
- :type timeout: float
:return: whatever `!gen` returns on completion.
Behave like in `wait()`, but take the fileno to wait from the generator
"""
try:
fileno, s = next(gen)
- if not timeout:
- timeout = None
+ if not interval:
+ interval = None
with DefaultSelector() as sel:
sel.register(fileno, s)
while True:
- rlist = sel.select(timeout=timeout)
+ rlist = sel.select(timeout=interval)
if not rlist:
gen.send(READY_NONE)
continue
async def wait_async(
- gen: PQGen[RV], fileno: int, timeout: Optional[float] = None
+ gen: PQGen[RV], fileno: int, interval: Optional[float] = None
) -> RV:
"""
Coroutine waiting for a generator to complete.
:param gen: a generator performing database operations and yielding
`Ready` values when it would block.
:param fileno: the file descriptor to wait on.
- :param timeout: timeout (in seconds) to check for other interrupt, e.g.
- to allow Ctrl-C. If zero, wait indefinitely.
+ :param interval: interval (in seconds) to check for other interrupt, e.g.
+ to allow Ctrl-C. If None, wait indefinitely.
:return: whatever `!gen` returns on completion.
Behave like in `wait()`, but exposing an `asyncio` interface.
if writer:
loop.add_writer(fileno, wakeup, READY_W)
try:
- if timeout is not None:
+ if interval is not None:
try:
- await wait_for(ev.wait(), timeout)
+ await wait_for(ev.wait(), interval)
except TimeoutError:
pass
else:
return rv
-async def wait_conn_async(gen: PQGenConn[RV], timeout: Optional[float] = None) -> RV:
+async def wait_conn_async(gen: PQGenConn[RV], interval: Optional[float] = None) -> RV:
"""
Coroutine waiting for a connection generator to complete.
:param gen: a generator performing database operations and yielding
(fd, `Ready`) pairs when it would block.
- :param timeout: timeout (in seconds) to check for other interrupt, e.g.
+ :param interval: interval (in seconds) to check for other interrupt, e.g.
to allow Ctrl-C. If zero or None, wait indefinitely.
:return: whatever `!gen` returns on completion.
if writer:
loop.add_writer(fileno, wakeup, READY_W)
try:
- if timeout:
+ if interval:
try:
- await wait_for(ev.wait(), timeout)
+ await wait_for(ev.wait(), interval)
except TimeoutError:
pass
else:
# Specialised implementation of wait functions.
-def wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
+def wait_select(gen: PQGen[RV], fileno: int, interval: Optional[float] = None) -> RV:
"""
Wait for a generator using select where supported.
fnlist if s & WAIT_R else empty,
fnlist if s & WAIT_W else empty,
fnlist,
- timeout,
+ interval,
)
ready = 0
if rl:
_epoll_evmasks = {}
-def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
+def wait_epoll(gen: PQGen[RV], fileno: int, interval: Optional[float] = None) -> RV:
"""
Wait for a generator using epoll where supported.
try:
s = next(gen)
- if timeout is None or timeout < 0:
- timeout = 0.0
+ if interval is None or interval < 0:
+ interval = 0.0
with select.epoll() as epoll:
evmask = _epoll_evmasks[s]
epoll.register(fileno, evmask)
while True:
- fileevs = epoll.poll(timeout)
+ fileevs = epoll.poll(interval)
if not fileevs:
gen.send(READY_NONE)
continue
_poll_evmasks = {}
-def wait_poll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
+def wait_poll(gen: PQGen[RV], fileno: int, interval: Optional[float] = None) -> RV:
"""
Wait for a generator using poll where supported.
try:
s = next(gen)
- if timeout is None or timeout < 0:
- timeout = 0
+ if interval is None or interval < 0:
+ interval = 0
else:
- timeout = int(timeout * 1000.0)
+ interval = int(interval * 1000.0)
poll = select.poll()
evmask = _poll_evmasks[s]
poll.register(fileno, evmask)
while True:
- fileevs = poll.poll(timeout)
+ fileevs = poll.poll(interval)
if not fileevs:
gen.send(READY_NONE)
continue
pgconn: PGconn, commands: Deque[abc.PipelineCommand]
) -> abc.PQGen[List[List[PGresult]]]: ...
def wait_c(
- gen: abc.PQGen[abc.RV], fileno: int, timeout: Optional[float] = None
+ gen: abc.PQGen[abc.RV], fileno: int, interval: Optional[float] = None
) -> abc.RV: ...
# Copy support
cdef int wait_c_impl(int fileno, int wait, float timeout) except -1
-def wait_c(gen: PQGen[RV], int fileno, timeout = None) -> RV:
+def wait_c(gen: PQGen[RV], int fileno, interval = None) -> RV:
"""
Wait for a generator using poll or select.
"""
- cdef float ctimeout
+ cdef float cinterval
cdef int wait, ready
cdef PyObject *pyready
- if timeout is None:
- ctimeout = -1.0
+ if interval is None:
+ cinterval = -1.0
else:
- ctimeout = <float>float(timeout)
- if ctimeout < 0.0:
- ctimeout = -1.0
+ cinterval = <float>float(interval)
+ if cinterval < 0.0:
+ cinterval = -1.0
send = gen.send
wait = next(gen)
while True:
- ready = wait_c_impl(fileno, wait, ctimeout)
+ ready = wait_c_impl(fileno, wait, cinterval)
if ready == READY_NONE:
pyready = <PyObject *>PY_READY_NONE
elif ready == READY_R:
]
events = ["R", "W", "RW"]
-timeouts = [pytest.param({}, id="blank")]
-timeouts += [pytest.param({"timeout": x}, id=str(x)) for x in [None, 0, 0.2, 10]]
+intervals = [pytest.param({}, id="blank")]
+intervals += [pytest.param({"interval": x}, id=str(x)) for x in [None, 0, 0.2, 10]]
-@pytest.mark.parametrize("timeout", timeouts)
+@pytest.mark.parametrize("timeout", intervals)
def test_wait_conn(dsn, timeout):
gen = generators.connect(dsn)
conn = waiting.wait_conn(gen, **timeout)
@pytest.mark.parametrize("waitfn", waitfns)
-@pytest.mark.parametrize("timeout", timeouts)
+@pytest.mark.parametrize("timeout", intervals)
def test_wait(pgconn, waitfn, timeout):
waitfn = getattr(waiting, waitfn)
except StopIteration as ex:
return ex.value
- (res,) = waitfn(gen_wrapper(), pgconn.socket, timeout=0.1)
+ (res,) = waitfn(gen_wrapper(), pgconn.socket, interval=0.1)
assert res.status == ExecStatus.TUPLES_OK
ds = [t1 - t0 for t0, t1 in zip(ts[:-1], ts[1:])]
assert len(ds) >= 5
f.close()
-@pytest.mark.parametrize("timeout", timeouts)
+@pytest.mark.parametrize("timeout", intervals)
@pytest.mark.anyio
async def test_wait_conn_async(dsn, timeout):
gen = generators.connect(dsn)