]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
exec generator split in send and receive
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 11 Apr 2020 15:41:11 +0000 (03:41 +1200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 11 Apr 2020 15:41:11 +0000 (03:41 +1200)
It was actually not needed: I thought erroneously that sending prepared
statements only needed the send bit. However the change is ok and could
also clean up the types around these generator adding a generic PQGen[]
type (with different return values).

psycopg3/connection.py
psycopg3/cursor.py
psycopg3/generators.py
psycopg3/waiting.py

index c3019029977bc81ce439d114543929067d8fb773..769afc9101780a97a37ef1a58be195ae7d573169 100644 (file)
@@ -8,7 +8,7 @@ import codecs
 import logging
 import asyncio
 import threading
-from typing import Any, Generator, Optional, Tuple, Type, TypeVar
+from typing import Any, Optional, Type
 from typing import cast, TYPE_CHECKING
 
 from . import pq
@@ -16,14 +16,13 @@ from . import errors as e
 from . import cursor
 from . import generators
 from .conninfo import make_conninfo
-from .waiting import wait, wait_async, Wait, Ready
+from .waiting import wait, wait_async
 
 logger = logging.getLogger(__name__)
 
-RV = TypeVar("RV")
-
 if TYPE_CHECKING:
     from .adapt import DumpersMap, LoadersMap
+    from .generators import PQGen, RV
 
 
 class BaseConnection:
@@ -164,11 +163,7 @@ class Connection(BaseConnection):
                 )
 
     @classmethod
-    def wait(
-        cls,
-        gen: Generator[Tuple[int, Wait], Ready, RV],
-        timeout: Optional[float] = 0.1,
-    ) -> RV:
+    def wait(cls, gen: "PQGen[RV]", timeout: Optional[float] = 0.1) -> "RV":
         return wait(gen, timeout=timeout)
 
     def set_client_encoding(self, value: str) -> None:
@@ -232,7 +227,7 @@ class AsyncConnection(BaseConnection):
                 )
 
     @classmethod
-    async def wait(cls, gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV:
+    async def wait(cls, gen: "PQGen[RV]") -> "RV":
         return await wait_async(gen)
 
     async def set_client_encoding(self, value: str) -> None:
index 4b7dd96bb6afd192262be2ad84d8b8fa3104e035..545732c22134cddda7859431239d34089eebe899 100644 (file)
@@ -17,7 +17,7 @@ from .utils.typing import Query, Params
 if TYPE_CHECKING:
     from .adapt import DumpersMap, LoadersMap, Transformer
     from .connection import BaseConnection, Connection, AsyncConnection
-    from .generators import QueryGen
+    from .generators import PQGen
 
 
 class Column(Sequence[Any]):
@@ -137,7 +137,7 @@ class BaseCursor:
 
     def _execute_send(
         self, query: Query, vars: Optional[Params]
-    ) -> "QueryGen":
+    ) -> "PQGen[List[pq.PGresult]]":
         """
         Implement part of execute() before waiting common to sync and async
         """
index 1899eaaf3a993ca1b9a56e11227bf8284d1cf1f5..54d69d290f6aaba1473b332b9cfae1350a64bcce 100644 (file)
@@ -6,30 +6,33 @@ waiting for the socket to be ready. This module contains the code to execute
 the operations, yielding a polling state whenever there is to wait. The
 functions in the `waiting` module are the ones who wait more or less
 cooperatively for the socket to be ready and make these generators continue.
+
+All these generators yield pairs (fileno, `Wait`) whenever an operation would
+block. The generator can be restarted sending the appropriate `Ready` state
+when the file descriptor is ready.
+
 """
 
 # Copyright (C) 2020 The Psycopg Team
 
 import logging
-from typing import Generator, List, Tuple
+from typing import Generator, List, Tuple, TypeVar
 from .waiting import Wait, Ready
 
 from . import pq
 from . import errors as e
 
-ConnectGen = Generator[Tuple[int, Wait], Ready, pq.PGconn]
-QueryGen = Generator[Tuple[int, Wait], Ready, List[pq.PGresult]]
+# Generic type of a libpq protocol generator.
+RV = TypeVar("RV")
+PQGen = Generator[Tuple[int, Wait], Ready, RV]
 
 logger = logging.getLogger(__name__)
 
 
-def connect(conninfo: str) -> ConnectGen:
+def connect(conninfo: str) -> PQGen[pq.PGconn]:
     """
     Generator to create a database connection without blocking.
 
-    Yield pairs (fileno, `Wait`) whenever an operation would block. The
-    generator can be restarted sending the appropriate `Ready` state when
-    the file descriptor is ready.
     """
     conn = pq.PGconn.connect_start(conninfo.encode("utf8"))
     logger.debug("connection started, status %s", conn.status.name)
@@ -58,23 +61,33 @@ def connect(conninfo: str) -> ConnectGen:
     return conn
 
 
-def execute(pgconn: pq.PGconn) -> QueryGen:
+def execute(pgconn: pq.PGconn) -> PQGen[List[pq.PGresult]]:
     """
-    Generator returning query results without blocking.
+    Generator sending a query and returning results without blocking.
 
     The query must have already been sent using `pgconn.send_query()` or
     similar. Flush the query and then return the result using nonblocking
     functions.
 
-    Yield pairs (fileno, `Wait`) whenever an operation would block. The
-    generator can be restarted sending the appropriate `Ready` state when
-    the file descriptor is ready.
-
     Return the list of results returned by the database (whether success
     or error).
     """
-    results: List[pq.PGresult] = []
+    yield from send(pgconn)
+    rv = yield from fetch(pgconn)
+    return rv
+
+
+def send(pgconn: pq.PGconn) -> PQGen[None]:
+    """
+    Generator to send a query to the server without blocking.
+
+    The query must have already been sent using `pgconn.send_query()` or
+    similar. Flush the query and then return the result using nonblocking
+    functions.
 
+    After this generator has finished you may want to cycle using `fetch()`
+    to retrieve the results available.
+    """
     while 1:
         f = pgconn.flush()
         if f == 0:
@@ -85,19 +98,28 @@ def execute(pgconn: pq.PGconn) -> QueryGen:
             pgconn.consume_input()
         continue
 
+
+def fetch(pgconn: pq.PGconn) -> PQGen[List[pq.PGresult]]:
+    """
+    Generator retrieving results from the database without blocking.
+
+    The query must have already been sent to the server, so pgconn.flush() has
+    already returned 0.
+
+    Return the list of results returned by the database (whether success
+    or error).
+    """
+    S = pq.ExecStatus
+    results: List[pq.PGresult] = []
     while 1:
         pgconn.consume_input()
         if pgconn.is_busy():
-            ready = yield pgconn.socket, Wait.R
+            yield pgconn.socket, Wait.R
         res = pgconn.get_result()
         if res is None:
             break
         results.append(res)
-        if res.status in (
-            pq.ExecStatus.COPY_IN,
-            pq.ExecStatus.COPY_OUT,
-            pq.ExecStatus.COPY_BOTH,
-        ):
+        if res.status in (S.COPY_IN, S.COPY_OUT, S.COPY_BOTH):
             # After entering copy mode the libpq will create a phony result
             # for every request so let's break the endless loop.
             break
index 0f52d1a56cc8b9ac80b0a1957067c2d856546e59..967eb0b5b6e8449f5706ad3cd96e76a428fec2c5 100644 (file)
@@ -1,17 +1,24 @@
 """
 Code concerned with waiting in different contexts (blocking, async, etc).
+
+These functions are designed to consume the generators returned by the
+`generators` module function and to return their final value.
+
 """
 
 # Copyright (C) 2020 The Psycopg Team
 
 
 from enum import IntEnum
-from typing import Generator, Optional, Tuple, TypeVar
+from typing import Optional, TYPE_CHECKING
 from asyncio import get_event_loop, Event
 from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
 
 from . import errors as e
 
+if TYPE_CHECKING:
+    from .generators import PQGen, RV
+
 
 class Wait(IntEnum):
     R = EVENT_READ
@@ -24,13 +31,7 @@ class Ready(IntEnum):
     W = EVENT_WRITE
 
 
-RV = TypeVar("RV")
-
-
-def wait(
-    gen: Generator[Tuple[int, Wait], Ready, RV],
-    timeout: Optional[float] = None,
-) -> RV:
+def wait(gen: "PQGen[RV]", timeout: Optional[float] = None) -> "RV":
     """
     Wait for a generator using the best option available on the platform.
 
@@ -55,11 +56,11 @@ def wait(
             fd, s = gen.send(ready[0][1])
 
     except StopIteration as ex:
-        rv: RV = ex.args[0]
+        rv: "RV" = ex.args[0]
         return rv
 
 
-async def wait_async(gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV:
+async def wait_async(gen: "PQGen[RV]") -> "RV":
     """
     Coroutine waiting for a generator to complete.
 
@@ -103,5 +104,5 @@ async def wait_async(gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV:
             fd, s = gen.send(ready)
 
     except StopIteration as ex:
-        rv: RV = ex.args[0]
+        rv: "RV" = ex.args[0]
         return rv