From: Denis Laxalde Date: Wed, 29 Sep 2021 12:09:29 +0000 (+0200) Subject: Extract send(), fetch_many() and fetch() from generators.execute() C impl X-Git-Tag: 3.0~28^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2c6ee71c8e43c927016c2a2a6ab1fd13414eccd2;p=thirdparty%2Fpsycopg.git Extract send(), fetch_many() and fetch() from generators.execute() C impl We extract send(), fetch_many() and fetch() functions from execute() C implementation. This way, the code structure is similar to the Python implementation. --- diff --git a/psycopg_c/psycopg_c/_psycopg.pyi b/psycopg_c/psycopg_c/_psycopg.pyi index 9a9d316d6..d5ec5efd2 100644 --- a/psycopg_c/psycopg_c/_psycopg.pyi +++ b/psycopg_c/psycopg_c/_psycopg.pyi @@ -55,6 +55,9 @@ class Transformer(abc.AdaptContext): # Generators def connect(conninfo: str) -> abc.PQGenConn[PGconn]: ... def execute(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ... +def send(pgconn: PGconn) -> abc.PQGen[None]: ... +def fetch_many(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ... +def fetch(pgconn: PGconn) -> abc.PQGen[Optional[PGresult]]: ... # Copy support def format_row_text( diff --git a/psycopg_c/psycopg_c/_psycopg/generators.pyx b/psycopg_c/psycopg_c/_psycopg/generators.pyx index c57ac9b44..4cb3478ac 100644 --- a/psycopg_c/psycopg_c/_psycopg/generators.pyx +++ b/psycopg_c/psycopg_c/_psycopg/generators.pyx @@ -66,14 +66,26 @@ def execute(pq.PGconn pgconn) -> PQGen[List[abc.PGresult]]: Return the list of results returned by the database (whether success or error). """ - cdef list results = [] + yield from send(pgconn) + rv = yield from fetch_many(pgconn) + return rv + + +def send(pq.PGconn pgconn) -> PQGen[None]: + """ + Generator to send a query to the server without blocking. + + The query must have already been sent using `pgconn.send_query()` or + similar. Flush the query and then return the result using nonblocking + functions. + + After this generator has finished you may want to cycle using `fetch()` + to retrieve the results available. + """ cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr cdef int status - cdef libpq.PGnotify *notify - cdef libpq.PGresult *pgres - cdef int cires, ibres + cdef int cires - # Sending the query while 1: if libpq.PQflush(pgconn_ptr) == 0: break @@ -87,11 +99,54 @@ def execute(pq.PGconn pgconn) -> PQGen[List[abc.PGresult]]: if 1 != cires: raise e.OperationalError( f"consuming input failed: {error_message(pgconn)}") - continue + +def fetch_many(pq.PGconn pgconn) -> PQGen[List[PGresult]]: + """ + Generator retrieving results from the database without blocking. + + The query must have already been sent to the server, so pgconn.flush() has + already returned 0. + + Return the list of results returned by the database (whether success + or error). + """ + cdef list results = [] + cdef int status + cdef pq.PGresult result + cdef libpq.PGresult *pgres + + while 1: + result = yield from fetch(pgconn) + if result is None: + break + results.append(result) + pgres = result._pgresult_ptr + + status = libpq.PQresultStatus(pgres) + if status in (libpq.PGRES_COPY_IN, libpq.PGRES_COPY_OUT, libpq.PGRES_COPY_BOTH): + # After entering copy mode the libpq will create a phony result + # for every request so let's break the endless loop. + break + + return results + + +def fetch(pq.PGconn pgconn) -> PQGen[Optional[PGresult]]: + """ + Generator retrieving a single result from the database without blocking. + + The query must have already been sent to the server, so pgconn.flush() has + already returned 0. + + Return a result from the database (whether success or error). + """ + cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr + cdef libpq.PGnotify *notify + cdef int cires, ibres cdef object notify_handler = pgconn.notify_handler + cdef libpq.PGresult *pgres - # Fetching the result while 1: with nogil: cires = libpq.PQconsumeInput(pgconn_ptr) @@ -101,35 +156,27 @@ def execute(pq.PGconn pgconn) -> PQGen[List[abc.PGresult]]: if 1 != cires: raise e.OperationalError( f"consuming input failed: {error_message(pgconn)}") - if ibres: - yield WAIT_R - continue - - # Consume notifies - if notify_handler is not None: - while 1: - pynotify = pgconn.notifies() - if pynotify is None: - break - PyObject_CallFunctionObjArgs( - notify_handler, pynotify, NULL - ) - else: - while 1: - notify = libpq.PQnotifies(pgconn_ptr) - if notify is NULL: - break - libpq.PQfreemem(notify) - - pgres = libpq.PQgetResult(pgconn_ptr) - if pgres is NULL: + if not ibres: break - results.append(pq.PGresult._from_ptr(pgres)) - - status = libpq.PQresultStatus(pgres) - if status in (libpq.PGRES_COPY_IN, libpq.PGRES_COPY_OUT, libpq.PGRES_COPY_BOTH): - # After entering copy mode the libpq will create a phony result - # for every request so let's break the endless loop. - break - - return results + yield WAIT_R + + # Consume notifies + if notify_handler is not None: + while 1: + pynotify = pgconn.notifies() + if pynotify is None: + break + PyObject_CallFunctionObjArgs( + notify_handler, pynotify, NULL + ) + else: + while 1: + notify = libpq.PQnotifies(pgconn_ptr) + if notify is NULL: + break + libpq.PQfreemem(notify) + + pgres = libpq.PQgetResult(pgconn_ptr) + if pgres is NULL: + return None + return pq.PGresult._from_ptr(pgres)