@classmethod
def _connect_gen(cls, conninfo):
"""
- Generator to create a database connection using without blocking.
+ Generator to create a database connection without blocking.
Yield pairs (fileno, `Wait`) whenever an operation would block. The
generator can be restarted sending the appropriate `Ready` state when
Return what the generator eventually returned.
"""
- # Use a queue to block and restart after the fd state changes.
+ # Use an event to block and restart after the fd state changes.
# Not sure this is the best implementation but it's a start.
- e = Event()
+ ev = Event()
loop = get_event_loop()
ready = None
def wakeup(state):
nonlocal ready
ready = state
- e.set()
+ ev.set()
try:
while 1:
fd, s = next(gen)
- e.clear()
+ ev.clear()
if s is Wait.R:
loop.add_reader(fd, wakeup, Ready.R)
- await e.wait()
+ await ev.wait()
loop.remove_reader(fd)
gen.send(ready)
elif s is Wait.W:
loop.add_writer(fd, wakeup, Ready.W)
- await e.wait()
+ await ev.wait()
loop.remove_writer(fd)
gen.send(ready)
elif s is Wait.RW:
loop.add_reader(fd, wakeup, Ready.R)
loop.add_writer(fd, wakeup, Ready.W)
- await e.wait()
+ await ev.wait()
loop.remove_reader(fd)
loop.remove_writer(fd)
gen.send(ready)
def pgconn(pq, dsn):
"""Return a PGconn connection open to `--test-dsn`."""
conn = pq.PGconn.connect(dsn.encode("utf8"))
- if conn.status != 0:
+ if conn.status != pq.ConnStatus.OK:
pytest.fail(
f"bad connection: {conn.error_message.decode('utf8', 'replace')}"
)