Return the list of results returned by the database (whether success
or error).
"""
- cdef list results = []
+ yield from send(pgconn)
+ rv = yield from fetch_many(pgconn)
+ return rv
+
+
+def send(pq.PGconn pgconn) -> PQGen[None]:
+ """
+ Generator to send a query to the server without blocking.
+
+ The query must have already been sent using `pgconn.send_query()` or
+ similar. Flush the query and then return the result using nonblocking
+ functions.
+
+ After this generator has finished you may want to cycle using `fetch()`
+ to retrieve the results available.
+ """
cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr
cdef int status
- cdef libpq.PGnotify *notify
- cdef libpq.PGresult *pgres
- cdef int cires, ibres
+ cdef int cires
- # Sending the query
while 1:
if libpq.PQflush(pgconn_ptr) == 0:
break
if 1 != cires:
raise e.OperationalError(
f"consuming input failed: {error_message(pgconn)}")
- continue
+
+def fetch_many(pq.PGconn pgconn) -> PQGen[List[PGresult]]:
+ """
+ Generator retrieving results from the database without blocking.
+
+ The query must have already been sent to the server, so pgconn.flush() has
+ already returned 0.
+
+ Return the list of results returned by the database (whether success
+ or error).
+ """
+ cdef list results = []
+ cdef int status
+ cdef pq.PGresult result
+ cdef libpq.PGresult *pgres
+
+ while 1:
+ result = yield from fetch(pgconn)
+ if result is None:
+ break
+ results.append(result)
+ pgres = result._pgresult_ptr
+
+ status = libpq.PQresultStatus(pgres)
+ if status in (libpq.PGRES_COPY_IN, libpq.PGRES_COPY_OUT, libpq.PGRES_COPY_BOTH):
+ # After entering copy mode the libpq will create a phony result
+ # for every request so let's break the endless loop.
+ break
+
+ return results
+
+
+def fetch(pq.PGconn pgconn) -> PQGen[Optional[PGresult]]:
+ """
+ Generator retrieving a single result from the database without blocking.
+
+ The query must have already been sent to the server, so pgconn.flush() has
+ already returned 0.
+
+ Return a result from the database (whether success or error).
+ """
+ cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr
+ cdef libpq.PGnotify *notify
+ cdef int cires, ibres
cdef object notify_handler = pgconn.notify_handler
+ cdef libpq.PGresult *pgres
- # Fetching the result
while 1:
with nogil:
cires = libpq.PQconsumeInput(pgconn_ptr)
if 1 != cires:
raise e.OperationalError(
f"consuming input failed: {error_message(pgconn)}")
- if ibres:
- yield WAIT_R
- continue
-
- # Consume notifies
- if notify_handler is not None:
- while 1:
- pynotify = pgconn.notifies()
- if pynotify is None:
- break
- PyObject_CallFunctionObjArgs(
- notify_handler, <PyObject *>pynotify, NULL
- )
- else:
- while 1:
- notify = libpq.PQnotifies(pgconn_ptr)
- if notify is NULL:
- break
- libpq.PQfreemem(notify)
-
- pgres = libpq.PQgetResult(pgconn_ptr)
- if pgres is NULL:
+ if not ibres:
break
- results.append(pq.PGresult._from_ptr(pgres))
-
- status = libpq.PQresultStatus(pgres)
- if status in (libpq.PGRES_COPY_IN, libpq.PGRES_COPY_OUT, libpq.PGRES_COPY_BOTH):
- # After entering copy mode the libpq will create a phony result
- # for every request so let's break the endless loop.
- break
-
- return results
+ yield WAIT_R
+
+ # Consume notifies
+ if notify_handler is not None:
+ while 1:
+ pynotify = pgconn.notifies()
+ if pynotify is None:
+ break
+ PyObject_CallFunctionObjArgs(
+ notify_handler, <PyObject *>pynotify, NULL
+ )
+ else:
+ while 1:
+ notify = libpq.PQnotifies(pgconn_ptr)
+ if notify is NULL:
+ break
+ libpq.PQfreemem(notify)
+
+ pgres = libpq.PQgetResult(pgconn_ptr)
+ if pgres is NULL:
+ return None
+ return pq.PGresult._from_ptr(pgres)