raise NotImplementedError()
conninfo = make_conninfo(conninfo, **kwargs)
gen = cls._connect_gen(conninfo)
- pgconn = wait_select(gen)
+ pgconn = cls.wait(gen)
return cls(pgconn)
def commit(self):
return
self.pgconn.send_query(command)
- (pgres,) = wait_select(self._exec_gen(self.pgconn))
+ (pgres,) = self.wait(self._exec_gen(self.pgconn))
if pgres.status != pq.ExecStatus.COMMAND_OK:
raise exc.OperationalError(
f"error on {command.decode('utf8')}:"
f" {pq.error_message(pgres)}"
)
+ @classmethod
+ def wait(cls, gen):
+ return wait_select(gen)
+
class AsyncConnection(BaseConnection):
"""
async def connect(cls, conninfo, **kwargs):
conninfo = make_conninfo(conninfo, **kwargs)
gen = cls._connect_gen(conninfo)
- pgconn = await wait_async(gen)
+ pgconn = await cls.wait(gen)
return cls(pgconn)
async def commit(self):
return
self.pgconn.send_query(command)
- (pgres,) = await wait_async(self._exec_gen(self.pgconn))
+ (pgres,) = await self.wait(self._exec_gen(self.pgconn))
if pgres.status != pq.ExecStatus.COMMAND_OK:
raise exc.OperationalError(
f"error on {command.decode('utf8')}:"
f" {pq.error_message(pgres)}"
)
+
+ @classmethod
+ async def wait(cls, gen):
+ return await wait_async(gen)
from select import select
-from psycopg3.waiting import wait_select
-
def test_send_query(pq, pgconn):
# This test shows how to process an async query in all its glory
b"/* %s */ select pg_sleep(0.01); select 1 as foo;"
% (b"x" * 1_000_000)
)
- results = wait_select(conn._exec_gen(conn.pgconn))
+ results = conn.wait(conn._exec_gen(conn.pgconn))
assert len(results) == 2
assert results[0].nfields == 1
def test_send_query_params(pq, conn):
res = conn.pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"])
- (res,) = wait_select(conn._exec_gen(conn.pgconn))
+ (res,) = conn.wait(conn._exec_gen(conn.pgconn))
assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"8"