import logging
import asyncio
import threading
-from typing import Any, Generator, Optional, Tuple, Type, TypeVar
+from typing import Any, Optional, Type
from typing import cast, TYPE_CHECKING
from . import pq
from . import cursor
from . import generators
from .conninfo import make_conninfo
-from .waiting import wait, wait_async, Wait, Ready
+from .waiting import wait, wait_async
logger = logging.getLogger(__name__)
-RV = TypeVar("RV")
-
if TYPE_CHECKING:
from .adapt import DumpersMap, LoadersMap
+ from .generators import PQGen, RV
class BaseConnection:
)
@classmethod
- def wait(
- cls,
- gen: Generator[Tuple[int, Wait], Ready, RV],
- timeout: Optional[float] = 0.1,
- ) -> RV:
+ def wait(cls, gen: "PQGen[RV]", timeout: Optional[float] = 0.1) -> "RV":
return wait(gen, timeout=timeout)
def set_client_encoding(self, value: str) -> None:
)
@classmethod
- async def wait(cls, gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV:
+ async def wait(cls, gen: "PQGen[RV]") -> "RV":
return await wait_async(gen)
async def set_client_encoding(self, value: str) -> None:
the operations, yielding a polling state whenever there is to wait. The
functions in the `waiting` module are the ones who wait more or less
cooperatively for the socket to be ready and make these generators continue.
+
+All these generators yield pairs (fileno, `Wait`) whenever an operation would
+block. The generator can be restarted sending the appropriate `Ready` state
+when the file descriptor is ready.
+
"""
# Copyright (C) 2020 The Psycopg Team
import logging
-from typing import Generator, List, Tuple
+from typing import Generator, List, Tuple, TypeVar
from .waiting import Wait, Ready
from . import pq
from . import errors as e
-ConnectGen = Generator[Tuple[int, Wait], Ready, pq.PGconn]
-QueryGen = Generator[Tuple[int, Wait], Ready, List[pq.PGresult]]
+# Generic type of a libpq protocol generator.
+RV = TypeVar("RV")
+PQGen = Generator[Tuple[int, Wait], Ready, RV]
logger = logging.getLogger(__name__)
-def connect(conninfo: str) -> ConnectGen:
+def connect(conninfo: str) -> PQGen[pq.PGconn]:
"""
Generator to create a database connection without blocking.
- Yield pairs (fileno, `Wait`) whenever an operation would block. The
- generator can be restarted sending the appropriate `Ready` state when
- the file descriptor is ready.
"""
conn = pq.PGconn.connect_start(conninfo.encode("utf8"))
logger.debug("connection started, status %s", conn.status.name)
return conn
-def execute(pgconn: pq.PGconn) -> QueryGen:
+def execute(pgconn: pq.PGconn) -> PQGen[List[pq.PGresult]]:
"""
- Generator returning query results without blocking.
+ Generator sending a query and returning results without blocking.
The query must have already been sent using `pgconn.send_query()` or
similar. Flush the query and then return the result using nonblocking
functions.
- Yield pairs (fileno, `Wait`) whenever an operation would block. The
- generator can be restarted sending the appropriate `Ready` state when
- the file descriptor is ready.
-
Return the list of results returned by the database (whether success
or error).
"""
- results: List[pq.PGresult] = []
+ yield from send(pgconn)
+ rv = yield from fetch(pgconn)
+ return rv
+
+
+def send(pgconn: pq.PGconn) -> PQGen[None]:
+ """
+ Generator to send a query to the server without blocking.
+
+ The query must have already been sent using `pgconn.send_query()` or
+ similar. Flush the query and then return the result using nonblocking
+ functions.
+ After this generator has finished you may want to cycle using `fetch()`
+ to retrieve the results available.
+ """
while 1:
f = pgconn.flush()
if f == 0:
pgconn.consume_input()
continue
+
+def fetch(pgconn: pq.PGconn) -> PQGen[List[pq.PGresult]]:
+ """
+ Generator retrieving results from the database without blocking.
+
+ The query must have already been sent to the server, so pgconn.flush() has
+ already returned 0.
+
+ Return the list of results returned by the database (whether success
+ or error).
+ """
+ S = pq.ExecStatus
+ results: List[pq.PGresult] = []
while 1:
pgconn.consume_input()
if pgconn.is_busy():
- ready = yield pgconn.socket, Wait.R
+ yield pgconn.socket, Wait.R
res = pgconn.get_result()
if res is None:
break
results.append(res)
- if res.status in (
- pq.ExecStatus.COPY_IN,
- pq.ExecStatus.COPY_OUT,
- pq.ExecStatus.COPY_BOTH,
- ):
+ if res.status in (S.COPY_IN, S.COPY_OUT, S.COPY_BOTH):
# After entering copy mode the libpq will create a phony result
# for every request so let's break the endless loop.
break
"""
Code concerned with waiting in different contexts (blocking, async, etc).
+
+These functions are designed to consume the generators returned by the
+`generators` module function and to return their final value.
+
"""
# Copyright (C) 2020 The Psycopg Team
from enum import IntEnum
-from typing import Generator, Optional, Tuple, TypeVar
+from typing import Optional, TYPE_CHECKING
from asyncio import get_event_loop, Event
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from . import errors as e
+if TYPE_CHECKING:
+ from .generators import PQGen, RV
+
class Wait(IntEnum):
R = EVENT_READ
W = EVENT_WRITE
-RV = TypeVar("RV")
-
-
-def wait(
- gen: Generator[Tuple[int, Wait], Ready, RV],
- timeout: Optional[float] = None,
-) -> RV:
+def wait(gen: "PQGen[RV]", timeout: Optional[float] = None) -> "RV":
"""
Wait for a generator using the best option available on the platform.
fd, s = gen.send(ready[0][1])
except StopIteration as ex:
- rv: RV = ex.args[0]
+ rv: "RV" = ex.args[0]
return rv
-async def wait_async(gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV:
+async def wait_async(gen: "PQGen[RV]") -> "RV":
"""
Coroutine waiting for a generator to complete.
fd, s = gen.send(ready)
except StopIteration as ex:
- rv: RV = ex.args[0]
+ rv: "RV" = ex.args[0]
return rv