From: Daniele Varrazzo Date: Sat, 11 Apr 2020 15:41:11 +0000 (+1200) Subject: exec generator split in send and receive X-Git-Tag: 3.0.dev0~568 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=83be6d2ca85df3182607e87819639db4bb3e4c01;p=thirdparty%2Fpsycopg.git exec generator split in send and receive It was actually not needed: I thought erroneously that sending prepared statements only needed the send bit. However the change is ok and could also clean up the types around these generator adding a generic PQGen[] type (with different return values). --- diff --git a/psycopg3/connection.py b/psycopg3/connection.py index c30190299..769afc910 100644 --- a/psycopg3/connection.py +++ b/psycopg3/connection.py @@ -8,7 +8,7 @@ import codecs import logging import asyncio import threading -from typing import Any, Generator, Optional, Tuple, Type, TypeVar +from typing import Any, Optional, Type from typing import cast, TYPE_CHECKING from . import pq @@ -16,14 +16,13 @@ from . import errors as e from . import cursor from . import generators from .conninfo import make_conninfo -from .waiting import wait, wait_async, Wait, Ready +from .waiting import wait, wait_async logger = logging.getLogger(__name__) -RV = TypeVar("RV") - if TYPE_CHECKING: from .adapt import DumpersMap, LoadersMap + from .generators import PQGen, RV class BaseConnection: @@ -164,11 +163,7 @@ class Connection(BaseConnection): ) @classmethod - def wait( - cls, - gen: Generator[Tuple[int, Wait], Ready, RV], - timeout: Optional[float] = 0.1, - ) -> RV: + def wait(cls, gen: "PQGen[RV]", timeout: Optional[float] = 0.1) -> "RV": return wait(gen, timeout=timeout) def set_client_encoding(self, value: str) -> None: @@ -232,7 +227,7 @@ class AsyncConnection(BaseConnection): ) @classmethod - async def wait(cls, gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV: + async def wait(cls, gen: "PQGen[RV]") -> "RV": return await wait_async(gen) async def set_client_encoding(self, value: str) -> None: diff --git a/psycopg3/cursor.py b/psycopg3/cursor.py index 4b7dd96bb..545732c22 100644 --- a/psycopg3/cursor.py +++ b/psycopg3/cursor.py @@ -17,7 +17,7 @@ from .utils.typing import Query, Params if TYPE_CHECKING: from .adapt import DumpersMap, LoadersMap, Transformer from .connection import BaseConnection, Connection, AsyncConnection - from .generators import QueryGen + from .generators import PQGen class Column(Sequence[Any]): @@ -137,7 +137,7 @@ class BaseCursor: def _execute_send( self, query: Query, vars: Optional[Params] - ) -> "QueryGen": + ) -> "PQGen[List[pq.PGresult]]": """ Implement part of execute() before waiting common to sync and async """ diff --git a/psycopg3/generators.py b/psycopg3/generators.py index 1899eaaf3..54d69d290 100644 --- a/psycopg3/generators.py +++ b/psycopg3/generators.py @@ -6,30 +6,33 @@ waiting for the socket to be ready. This module contains the code to execute the operations, yielding a polling state whenever there is to wait. The functions in the `waiting` module are the ones who wait more or less cooperatively for the socket to be ready and make these generators continue. + +All these generators yield pairs (fileno, `Wait`) whenever an operation would +block. The generator can be restarted sending the appropriate `Ready` state +when the file descriptor is ready. + """ # Copyright (C) 2020 The Psycopg Team import logging -from typing import Generator, List, Tuple +from typing import Generator, List, Tuple, TypeVar from .waiting import Wait, Ready from . import pq from . import errors as e -ConnectGen = Generator[Tuple[int, Wait], Ready, pq.PGconn] -QueryGen = Generator[Tuple[int, Wait], Ready, List[pq.PGresult]] +# Generic type of a libpq protocol generator. +RV = TypeVar("RV") +PQGen = Generator[Tuple[int, Wait], Ready, RV] logger = logging.getLogger(__name__) -def connect(conninfo: str) -> ConnectGen: +def connect(conninfo: str) -> PQGen[pq.PGconn]: """ Generator to create a database connection without blocking. - Yield pairs (fileno, `Wait`) whenever an operation would block. The - generator can be restarted sending the appropriate `Ready` state when - the file descriptor is ready. """ conn = pq.PGconn.connect_start(conninfo.encode("utf8")) logger.debug("connection started, status %s", conn.status.name) @@ -58,23 +61,33 @@ def connect(conninfo: str) -> ConnectGen: return conn -def execute(pgconn: pq.PGconn) -> QueryGen: +def execute(pgconn: pq.PGconn) -> PQGen[List[pq.PGresult]]: """ - Generator returning query results without blocking. + Generator sending a query and returning results without blocking. The query must have already been sent using `pgconn.send_query()` or similar. Flush the query and then return the result using nonblocking functions. - Yield pairs (fileno, `Wait`) whenever an operation would block. The - generator can be restarted sending the appropriate `Ready` state when - the file descriptor is ready. - Return the list of results returned by the database (whether success or error). """ - results: List[pq.PGresult] = [] + yield from send(pgconn) + rv = yield from fetch(pgconn) + return rv + + +def send(pgconn: pq.PGconn) -> PQGen[None]: + """ + Generator to send a query to the server without blocking. + + The query must have already been sent using `pgconn.send_query()` or + similar. Flush the query and then return the result using nonblocking + functions. + After this generator has finished you may want to cycle using `fetch()` + to retrieve the results available. + """ while 1: f = pgconn.flush() if f == 0: @@ -85,19 +98,28 @@ def execute(pgconn: pq.PGconn) -> QueryGen: pgconn.consume_input() continue + +def fetch(pgconn: pq.PGconn) -> PQGen[List[pq.PGresult]]: + """ + Generator retrieving results from the database without blocking. + + The query must have already been sent to the server, so pgconn.flush() has + already returned 0. + + Return the list of results returned by the database (whether success + or error). + """ + S = pq.ExecStatus + results: List[pq.PGresult] = [] while 1: pgconn.consume_input() if pgconn.is_busy(): - ready = yield pgconn.socket, Wait.R + yield pgconn.socket, Wait.R res = pgconn.get_result() if res is None: break results.append(res) - if res.status in ( - pq.ExecStatus.COPY_IN, - pq.ExecStatus.COPY_OUT, - pq.ExecStatus.COPY_BOTH, - ): + if res.status in (S.COPY_IN, S.COPY_OUT, S.COPY_BOTH): # After entering copy mode the libpq will create a phony result # for every request so let's break the endless loop. break diff --git a/psycopg3/waiting.py b/psycopg3/waiting.py index 0f52d1a56..967eb0b5b 100644 --- a/psycopg3/waiting.py +++ b/psycopg3/waiting.py @@ -1,17 +1,24 @@ """ Code concerned with waiting in different contexts (blocking, async, etc). + +These functions are designed to consume the generators returned by the +`generators` module function and to return their final value. + """ # Copyright (C) 2020 The Psycopg Team from enum import IntEnum -from typing import Generator, Optional, Tuple, TypeVar +from typing import Optional, TYPE_CHECKING from asyncio import get_event_loop, Event from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from . import errors as e +if TYPE_CHECKING: + from .generators import PQGen, RV + class Wait(IntEnum): R = EVENT_READ @@ -24,13 +31,7 @@ class Ready(IntEnum): W = EVENT_WRITE -RV = TypeVar("RV") - - -def wait( - gen: Generator[Tuple[int, Wait], Ready, RV], - timeout: Optional[float] = None, -) -> RV: +def wait(gen: "PQGen[RV]", timeout: Optional[float] = None) -> "RV": """ Wait for a generator using the best option available on the platform. @@ -55,11 +56,11 @@ def wait( fd, s = gen.send(ready[0][1]) except StopIteration as ex: - rv: RV = ex.args[0] + rv: "RV" = ex.args[0] return rv -async def wait_async(gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV: +async def wait_async(gen: "PQGen[RV]") -> "RV": """ Coroutine waiting for a generator to complete. @@ -103,5 +104,5 @@ async def wait_async(gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV: fd, s = gen.send(ready) except StopIteration as ex: - rv: RV = ex.args[0] + rv: "RV" = ex.args[0] return rv