COPY_BOTH = pq.ExecStatus.COPY_BOTH
PIPELINE_SYNC = pq.ExecStatus.PIPELINE_SYNC
+WAIT_R = Wait.R
+WAIT_W = Wait.W
+WAIT_RW = Wait.RW
+READY_R = Ready.R
+READY_W = Ready.W
+READY_RW = Ready.RW
+
logger = logging.getLogger(__name__)
if status == POLL_OK:
break
elif status == POLL_READING:
- yield conn.socket, Wait.R
+ yield conn.socket, WAIT_R
elif status == POLL_WRITING:
- yield conn.socket, Wait.W
+ yield conn.socket, WAIT_W
elif status == POLL_FAILED:
encoding = conninfo_encoding(conninfo)
raise e.OperationalError(
if f == 0:
break
- ready = yield Wait.RW
- if ready & Ready.R:
+ ready = yield WAIT_RW
+ if ready & READY_R:
# This call may read notifies: they will be saved in the
# PGconn buffer and passed to Python later, in `fetch()`.
pgconn.consume_input()
Return a result from the database (whether success or error).
"""
if pgconn.is_busy():
- yield Wait.R
+ yield WAIT_R
while True:
pgconn.consume_input()
if not pgconn.is_busy():
break
- yield Wait.R
+ yield WAIT_R
# Consume notifies
while True:
results = []
while True:
- ready = yield Wait.RW
+ ready = yield WAIT_RW
- if ready & Ready.R:
+ if ready & READY_R:
pgconn.consume_input()
while True:
n = pgconn.notifies()
else:
res.append(r)
- if ready & Ready.W:
+ if ready & READY_W:
pgconn.flush()
if not commands:
break
def notifies(pgconn: PGconn) -> PQGen[List[pq.PGnotify]]:
- yield Wait.R
+ yield WAIT_R
pgconn.consume_input()
ns = []
break
# would block
- yield Wait.R
+ yield WAIT_R
pgconn.consume_input()
if nbytes > 0:
# into smaller ones. We prefer to do it there instead of here in order to
# do it upstream the queue decoupling the writer task from the producer one.
while pgconn.put_copy_data(buffer) == 0:
- yield Wait.W
+ yield WAIT_W
def copy_end(pgconn: PGconn, error: Optional[bytes]) -> PQGen[PGresult]:
# Retry enqueuing end copy message until successful
while pgconn.put_copy_end(error) == 0:
- yield Wait.W
+ yield WAIT_W
# Repeat until it the message is flushed to the server
while True:
- yield Wait.W
+ yield WAIT_W
f = pgconn.flush()
if f == 0:
break
RW = EVENT_READ | EVENT_WRITE
+WAIT_R = Wait.R
+WAIT_W = Wait.W
+WAIT_RW = Wait.RW
+READY_R = Ready.R
+READY_W = Ready.W
+READY_RW = Ready.RW
+
+
def wait_selector(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
"""
Wait for a generator using the best strategy available.
try:
s = next(gen)
while True:
- reader = s & Wait.R
- writer = s & Wait.W
+ reader = s & WAIT_R
+ writer = s & WAIT_W
if not reader and not writer:
raise e.InternalError(f"bad poll status: {s}")
ev.clear()
ready = 0 # type: ignore[assignment]
if reader:
- loop.add_reader(fileno, wakeup, Ready.R)
+ loop.add_reader(fileno, wakeup, READY_R)
if writer:
- loop.add_writer(fileno, wakeup, Ready.W)
+ loop.add_writer(fileno, wakeup, READY_W)
try:
await ev.wait()
finally:
if not timeout:
timeout = None
while True:
- reader = s & Wait.R
- writer = s & Wait.W
+ reader = s & WAIT_R
+ writer = s & WAIT_W
if not reader and not writer:
raise e.InternalError(f"bad poll status: {s}")
ev.clear()
ready = 0 # type: ignore[assignment]
if reader:
- loop.add_reader(fileno, wakeup, Ready.R)
+ loop.add_reader(fileno, wakeup, READY_R)
if writer:
- loop.add_writer(fileno, wakeup, Ready.W)
+ loop.add_writer(fileno, wakeup, READY_W)
try:
await wait_for(ev.wait(), timeout)
finally:
ev = fileevs[0][1]
ready = 0
if ev & ~select.EPOLLOUT:
- ready = Ready.R
+ ready = READY_R
if ev & ~select.EPOLLIN:
- ready |= Ready.W
+ ready |= READY_W
assert s & ready
s = gen.send(ready)
evmask = poll_evmasks[s]
cdef object WAIT_R = Wait.R
cdef object WAIT_RW = Wait.RW
cdef int READY_R = Ready.R
+cdef int READY_W = Ready.W
def connect(conninfo: str) -> PQGenConn[abc.PGconn]:
"""
while True:
ready = yield WAIT_RW
- if ready & Ready.R:
+ if ready & READY_R:
pgconn.consume_input()
with nogil:
cires = libpq.PQconsumeInput(pgconn_ptr)
res.append(r)
- if ready & Ready.W:
+ if ready & READY_W:
pgconn.flush()
if not commands:
break