]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Extract send(), fetch_many() and fetch() from generators.execute() C impl
authorDenis Laxalde <denis.laxalde@dalibo.com>
Wed, 29 Sep 2021 12:09:29 +0000 (14:09 +0200)
committerDenis Laxalde <denis.laxalde@dalibo.com>
Tue, 5 Oct 2021 07:31:44 +0000 (09:31 +0200)
We extract send(), fetch_many() and fetch() functions from execute() C
implementation. This way, the code structure is similar to the Python
implementation.

psycopg_c/psycopg_c/_psycopg.pyi
psycopg_c/psycopg_c/_psycopg/generators.pyx

index 9a9d316d6fbd271b955107a2da40133c488d0752..d5ec5efd248fea4ccb9bc0456a821e262fa212ea 100644 (file)
@@ -55,6 +55,9 @@ class Transformer(abc.AdaptContext):
 # Generators
 def connect(conninfo: str) -> abc.PQGenConn[PGconn]: ...
 def execute(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
+def send(pgconn: PGconn) -> abc.PQGen[None]: ...
+def fetch_many(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
+def fetch(pgconn: PGconn) -> abc.PQGen[Optional[PGresult]]: ...
 
 # Copy support
 def format_row_text(
index c57ac9b444cc6baeb67586381e51f91b708aaa79..4cb3478acf572195ebd2ba74e33f28bf5f9a0ec6 100644 (file)
@@ -66,14 +66,26 @@ def execute(pq.PGconn pgconn) -> PQGen[List[abc.PGresult]]:
     Return the list of results returned by the database (whether success
     or error).
     """
-    cdef list results = []
+    yield from send(pgconn)
+    rv = yield from fetch_many(pgconn)
+    return rv
+
+
+def send(pq.PGconn pgconn) -> PQGen[None]:
+    """
+    Generator to send a query to the server without blocking.
+
+    The query must have already been sent using `pgconn.send_query()` or
+    similar. Flush the query and then return the result using nonblocking
+    functions.
+
+    After this generator has finished you may want to cycle using `fetch()`
+    to retrieve the results available.
+    """
     cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr
     cdef int status
-    cdef libpq.PGnotify *notify
-    cdef libpq.PGresult *pgres
-    cdef int cires, ibres
+    cdef int cires
 
-    # Sending the query
     while 1:
         if libpq.PQflush(pgconn_ptr) == 0:
             break
@@ -87,11 +99,54 @@ def execute(pq.PGconn pgconn) -> PQGen[List[abc.PGresult]]:
             if 1 != cires:
                 raise e.OperationalError(
                     f"consuming input failed: {error_message(pgconn)}")
-        continue
 
+
+def fetch_many(pq.PGconn pgconn) -> PQGen[List[PGresult]]:
+    """
+    Generator retrieving results from the database without blocking.
+
+    The query must have already been sent to the server, so pgconn.flush() has
+    already returned 0.
+
+    Return the list of results returned by the database (whether success
+    or error).
+    """
+    cdef list results = []
+    cdef int status
+    cdef pq.PGresult result
+    cdef libpq.PGresult *pgres
+
+    while 1:
+        result = yield from fetch(pgconn)
+        if result is None:
+            break
+        results.append(result)
+        pgres = result._pgresult_ptr
+
+        status = libpq.PQresultStatus(pgres)
+        if status in (libpq.PGRES_COPY_IN, libpq.PGRES_COPY_OUT, libpq.PGRES_COPY_BOTH):
+            # After entering copy mode the libpq will create a phony result
+            # for every request so let's break the endless loop.
+            break
+
+    return results
+
+
+def fetch(pq.PGconn pgconn) -> PQGen[Optional[PGresult]]:
+    """
+    Generator retrieving a single result from the database without blocking.
+
+    The query must have already been sent to the server, so pgconn.flush() has
+    already returned 0.
+
+    Return a result from the database (whether success or error).
+    """
+    cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr
+    cdef libpq.PGnotify *notify
+    cdef int cires, ibres
     cdef object notify_handler = pgconn.notify_handler
+    cdef libpq.PGresult *pgres
 
-    # Fetching the result
     while 1:
         with nogil:
             cires = libpq.PQconsumeInput(pgconn_ptr)
@@ -101,35 +156,27 @@ def execute(pq.PGconn pgconn) -> PQGen[List[abc.PGresult]]:
         if 1 != cires:
             raise e.OperationalError(
                 f"consuming input failed: {error_message(pgconn)}")
-        if ibres:
-            yield WAIT_R
-            continue
-
-        # Consume notifies
-        if notify_handler is not None:
-            while 1:
-                pynotify = pgconn.notifies()
-                if pynotify is None:
-                    break
-                PyObject_CallFunctionObjArgs(
-                    notify_handler, <PyObject *>pynotify, NULL
-                )
-        else:
-            while 1:
-                notify = libpq.PQnotifies(pgconn_ptr)
-                if notify is NULL:
-                    break
-                libpq.PQfreemem(notify)
-
-        pgres = libpq.PQgetResult(pgconn_ptr)
-        if pgres is NULL:
+        if not ibres:
             break
-        results.append(pq.PGresult._from_ptr(pgres))
-
-        status = libpq.PQresultStatus(pgres)
-        if status in (libpq.PGRES_COPY_IN, libpq.PGRES_COPY_OUT, libpq.PGRES_COPY_BOTH):
-            # After entering copy mode the libpq will create a phony result
-            # for every request so let's break the endless loop.
-            break
-
-    return results
+        yield WAIT_R
+
+    # Consume notifies
+    if notify_handler is not None:
+        while 1:
+            pynotify = pgconn.notifies()
+            if pynotify is None:
+                break
+            PyObject_CallFunctionObjArgs(
+                notify_handler, <PyObject *>pynotify, NULL
+            )
+    else:
+        while 1:
+            notify = libpq.PQnotifies(pgconn_ptr)
+            if notify is NULL:
+                break
+            libpq.PQfreemem(notify)
+
+    pgres = libpq.PQgetResult(pgconn_ptr)
+    if pgres is NULL:
+        return None
+    return pq.PGresult._from_ptr(pgres)