# PQprint: pretty useless
+# 33.4. Asynchronous Command Processing
+
+PQsendQuery = pq.PQsendQuery
+PQsendQuery.argtypes = [PGconn_ptr, c_char_p]
+PQsendQuery.restype = c_int
+
+# TODO: PQsendQueryParams PQsendPrepare PQsendQueryPrepared
+# PQsendDescribePrepared PQsendDescribePortal
+
+PQgetResult = pq.PQgetResult
+PQgetResult.argtypes = [PGconn_ptr]
+PQgetResult.restype = PGresult_ptr
+
+PQconsumeInput = pq.PQconsumeInput
+PQconsumeInput.argtypes = [PGconn_ptr]
+PQconsumeInput.restype = c_int
+
+PQisBusy = pq.PQisBusy
+PQisBusy.argtypes = [PGconn_ptr]
+PQisBusy.restype = c_int
+
+PQsetnonblocking = pq.PQsetnonblocking
+PQsetnonblocking.argtypes = [PGconn_ptr, c_int]
+PQsetnonblocking.restype = c_int
+
+PQisnonblocking = pq.PQisnonblocking
+PQisnonblocking.argtypes = [PGconn_ptr]
+PQisnonblocking.restype = c_int
+
+PQflush = pq.PQflush
+PQflush.argtypes = [PGconn_ptr]
+PQflush.restype == c_int
+
+
# 33.11. Miscellaneous Functions
PQfreemem = pq.PQfreemem
raise MemoryError("couldn't allocate PGresult")
return PGresult(rv)
+ def send_query(self, command):
+ if not isinstance(command, bytes):
+ raise TypeError(
+ "bytes expected, got %s instead" % type(command).__name__
+ )
+ return impl.PQsendQuery(self.pgconn_ptr, command)
+
def exec_params(
self,
command,
raise MemoryError("couldn't allocate PGresult")
return PGresult(rv)
+ def get_result(self):
+ rv = impl.PQgetResult(self.pgconn_ptr)
+ return PGresult(rv) if rv else None
+
+ def consume_input(self):
+ return impl.PQconsumeInput(self.pgconn_ptr)
+
+ def is_busy(self):
+ return impl.PQisBusy(self.pgconn_ptr)
+
+ def is_non_blocking(self):
+ return impl.PQisnonblocking(self.pgconn_ptr)
+
+ def set_non_blocking(self, arg):
+ return impl.PQsetnonblocking(self.pgconn_ptr, arg)
+
+ def flush(self):
+ return impl.PQflush(self.pgconn_ptr)
+
class PGresult:
__slots__ = ("pgresult_ptr",)
--- /dev/null
+from select import select
+
+
+def test_send_query(pq, pgconn):
+ # This test shows how to process an async query in all its glory
+ pgconn.set_non_blocking(1)
+
+ # Long query to make sure we have to wait on send
+ pgconn.send_query(
+ b"/* %s */ select pg_sleep(0.01); select 1 as foo;"
+ % (b"x" * 1_000_000)
+ )
+
+ # send loop
+ waited_on_send = 0
+ while 1:
+ f = pgconn.flush()
+ assert f != -1
+ if f == 0:
+ break
+
+ waited_on_send += 1
+
+ rl, wl, xl = select([pgconn.socket], [pgconn.socket], [])
+ assert not (rl and wl)
+ if wl:
+ continue # call flush again()
+ if rl:
+ assert pgconn.consume_input() == 1, pgconn.error_message
+ continue
+
+ assert waited_on_send
+
+ # read loop
+ results = []
+ while 1:
+ assert pgconn.consume_input() == 1, pgconn.error_message
+ if pgconn.is_busy():
+ select([pgconn.socket], [], [])
+ continue
+ res = pgconn.get_result()
+ if res is None:
+ break
+ assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ results.append(res)
+
+ assert len(results) == 2
+ assert results[0].nfields == 1
+ assert results[0].fname(0) == b"pg_sleep"
+ assert results[0].get_value(0, 0) == b""
+ assert results[1].nfields == 1
+ assert results[1].fname(0) == b"foo"
+ assert results[1].get_value(0, 0) == b"1"