This should make the query operation marginally faster.
import threading
from types import TracebackType
from typing import Any, AsyncIterator, Callable, Iterator, List, NamedTuple
-from typing import Optional, Type, TYPE_CHECKING
+from typing import Optional, Type, TYPE_CHECKING, Union
from weakref import ref, ReferenceType
from functools import partial
from contextlib import contextmanager
from . import encodings
from .pq import TransactionStatus, ExecStatus, Format
from .sql import Composable
-from .proto import DumpersMap, LoadersMap, PQGen, RV, Query
+from .proto import DumpersMap, LoadersMap, PQGen, PQGenConn, RV, Query
from .waiting import wait, wait_async
from .conninfo import make_conninfo
from .generators import notifies
logger = logging.getLogger(__name__)
package_logger = logging.getLogger("psycopg3")
-connect: Callable[[str], PQGen["PGconn"]]
+connect: Callable[[str], PQGenConn["PGconn"]]
execute: Callable[["PGconn"], PQGen[List["PGresult"]]]
if TYPE_CHECKING:
yield tx
@classmethod
- def wait(cls, gen: PQGen[RV], timeout: Optional[float] = 0.1) -> RV:
+ def wait(
+ cls,
+ gen: Union[PQGen[RV], PQGenConn[RV]],
+ timeout: Optional[float] = 0.1,
+ ) -> RV:
return wait(gen, timeout=timeout)
def _set_client_encoding(self, name: str) -> None:
yield tx
@classmethod
- async def wait(cls, gen: PQGen[RV]) -> RV:
+ async def wait(cls, gen: Union[PQGen[RV], PQGenConn[RV]]) -> RV:
return await wait_async(gen)
def _set_client_encoding(self, name: str) -> None:
from . import pq
from . import errors as e
from .pq import ConnStatus, PollingStatus, ExecStatus
-from .proto import PQGen
+from .proto import PQGen, PQGenConn
from .waiting import Wait, Ready
from .encodings import py_codecs
from .pq.proto import PGconn, PGresult
logger = logging.getLogger(__name__)
-def connect(conninfo: str) -> PQGen[PGconn]:
+def connect(conninfo: str) -> PQGenConn[PGconn]:
"""
Generator to create a database connection without blocking.
or error).
"""
yield from send(pgconn)
- rv = yield from fetch(pgconn)
+ rv = yield from _fetch(pgconn)
return rv
similar. Flush the query and then return the result using nonblocking
functions.
- After this generator has finished you may want to cycle using `fetch()`
+ After this generator has finished you may want to cycle using `_fetch()`
to retrieve the results available.
"""
+ yield pgconn.socket
while 1:
f = pgconn.flush()
if f == 0:
break
- ready = yield pgconn.socket, Wait.RW
+ ready = yield Wait.RW
if ready & Ready.R:
# This call may read notifies: they will be saved in the
- # PGconn buffer and passed to Python later, in `fetch()`.
+ # PGconn buffer and passed to Python later, in `_fetch()`.
pgconn.consume_input()
continue
-def fetch(pgconn: PGconn) -> PQGen[List[PGresult]]:
+def _fetch(pgconn: PGconn) -> PQGen[List[PGresult]]:
"""
Generator retrieving results from the database without blocking.
Return the list of results returned by the database (whether success
or error).
+
+ Note that this generator doesn't yield the socket number, which must have
+ been already sent in the sending part of the cycle.
"""
results: List[PGresult] = []
while 1:
pgconn.consume_input()
if pgconn.is_busy():
- yield pgconn.socket, Wait.R
+ yield Wait.R
continue
# Consume notifies
def notifies(pgconn: PGconn) -> PQGen[List[pq.PGnotify]]:
- yield pgconn.socket, Wait.R
+ yield pgconn.socket
+ yield Wait.R
pgconn.consume_input()
ns = []
def copy_from(pgconn: PGconn) -> PQGen[Union[bytes, PGresult]]:
+ yield pgconn.socket
while 1:
nbytes, data = pgconn.get_copy_data(1)
if nbytes != 0:
break
# would block
- yield pgconn.socket, Wait.R
+ yield Wait.R
pgconn.consume_input()
if nbytes > 0:
return data
# Retrieve the final result of copy
- (result,) = yield from fetch(pgconn)
+ (result,) = yield from _fetch(pgconn)
if result.status != ExecStatus.COMMAND_OK:
encoding = py_codecs.get(
pgconn.parameter_status(b"client_encoding") or "", "utf-8"
def copy_to(pgconn: PGconn, buffer: bytes) -> PQGen[None]:
+ yield pgconn.socket
# Retry enqueuing data until successful
while pgconn.put_copy_data(buffer) == 0:
- yield pgconn.socket, Wait.W
+ yield Wait.W
def copy_end(pgconn: PGconn, error: Optional[bytes]) -> PQGen[PGresult]:
+ yield pgconn.socket
# Retry enqueuing end copy message until successful
while pgconn.put_copy_end(error) == 0:
- yield pgconn.socket, Wait.W
+ yield Wait.W
# Repeat until it the message is flushed to the server
while 1:
- yield pgconn.socket, Wait.W
+ yield Wait.W
f = pgconn.flush()
if f == 0:
break
# Retrieve the final result of copy
- (result,) = yield from fetch(pgconn)
+ (result,) = yield from _fetch(pgconn)
if result.status != ExecStatus.COMMAND_OK:
encoding = py_codecs.get(
pgconn.parameter_status(b"client_encoding") or "", "utf-8"
# Waiting protocol types
RV = TypeVar("RV")
-PQGen = Generator[Tuple[int, "Wait"], "Ready", RV]
+PQGenConn = Generator[Tuple[int, "Wait"], "Ready", RV]
+"""Generator for processes where the connection file number can change.
+
+This can happen in connection and reset, but not in normal querying.
+"""
+
+PQGen = Generator[Union[int, "Wait"], "Ready", RV]
+"""Generator for processes where the connection file number won't change.
+
+The first item generated is the file descriptor; following items are be the
+Wait states.
+"""
# Adaptation types
from enum import IntEnum
-from typing import Optional
+from typing import Optional, Union
from asyncio import get_event_loop, Event
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from . import errors as e
-from .proto import PQGen, RV
+from .proto import PQGen, PQGenConn, RV
class Wait(IntEnum):
W = EVENT_WRITE
-def wait(gen: PQGen[RV], timeout: Optional[float] = None) -> RV:
+def wait(
+ gen: Union[PQGen[RV], PQGenConn[RV]], timeout: Optional[float] = None
+) -> RV:
"""
Wait for a generator using the best option available on the platform.
:type timeout: float
:return: whatever *gen* returns on completion.
"""
+ fd: int
+ s: Wait
sel = DefaultSelector()
try:
- fd, s = next(gen)
- while 1:
- sel.register(fd, s)
- ready = None
- while not ready:
- ready = sel.select(timeout=timeout)
- sel.unregister(fd)
-
- assert len(ready) == 1
- fd, s = gen.send(ready[0][1])
+ # Use the first generated item to tell if it's a PQgen or PQgenConn.
+ # Note: mypy gets confused by the behaviour of this generator.
+ item = next(gen)
+ if isinstance(item, tuple):
+ fd, s = item
+ while 1:
+ sel.register(fd, s)
+ ready = None
+ while not ready:
+ ready = sel.select(timeout=timeout)
+ sel.unregister(fd)
+
+ assert len(ready) == 1
+ fd, s = gen.send(ready[0][1])
+ else:
+ fd = item # type: ignore[assignment]
+ s = next(gen) # type: ignore[assignment]
+ while 1:
+ sel.register(fd, s)
+ ready = None
+ while not ready:
+ ready = sel.select(timeout=timeout)
+ sel.unregister(fd)
+
+ assert len(ready) == 1
+ s = gen.send(ready[0][1]) # type: ignore[arg-type,assignment]
except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
return rv
-async def wait_async(gen: PQGen[RV]) -> RV:
+async def wait_async(gen: Union[PQGen[RV], PQGenConn[RV]]) -> RV:
"""
Coroutine waiting for a generator to complete.
ev = Event()
loop = get_event_loop()
ready: Ready
+ fd: int
+ s: Wait
def wakeup(state: Ready) -> None:
nonlocal ready
ev.set()
try:
- fd, s = next(gen)
- while 1:
- ev.clear()
- if s == Wait.R:
- loop.add_reader(fd, wakeup, Ready.R)
- await ev.wait()
- loop.remove_reader(fd)
- elif s == Wait.W:
- loop.add_writer(fd, wakeup, Ready.W)
- await ev.wait()
- loop.remove_writer(fd)
- elif s == Wait.RW:
- loop.add_reader(fd, wakeup, Ready.R)
- loop.add_writer(fd, wakeup, Ready.W)
- await ev.wait()
- loop.remove_reader(fd)
- loop.remove_writer(fd)
- else:
- raise e.InternalError("bad poll status: %s")
- fd, s = gen.send(ready)
+ # Use the first generated item to tell if it's a PQgen or PQgenConn.
+ # Note: mypy gets confused by the behaviour of this generator.
+ item = next(gen)
+ if isinstance(item, tuple):
+ fd, s = item
+ while 1:
+ ev.clear()
+ if s == Wait.R:
+ loop.add_reader(fd, wakeup, Ready.R)
+ await ev.wait()
+ loop.remove_reader(fd)
+ elif s == Wait.W:
+ loop.add_writer(fd, wakeup, Ready.W)
+ await ev.wait()
+ loop.remove_writer(fd)
+ elif s == Wait.RW:
+ loop.add_reader(fd, wakeup, Ready.R)
+ loop.add_writer(fd, wakeup, Ready.W)
+ await ev.wait()
+ loop.remove_reader(fd)
+ loop.remove_writer(fd)
+ else:
+ raise e.InternalError("bad poll status: %s")
+ fd, s = gen.send(ready) # type: ignore[misc]
+ else:
+ fd = item # type: ignore[assignment]
+ s = next(gen) # type: ignore[assignment]
+ while 1:
+ ev.clear()
+ if s == Wait.R:
+ loop.add_reader(fd, wakeup, Ready.R)
+ await ev.wait()
+ loop.remove_reader(fd)
+ elif s == Wait.W:
+ loop.add_writer(fd, wakeup, Ready.W)
+ await ev.wait()
+ loop.remove_writer(fd)
+ elif s == Wait.RW:
+ loop.add_reader(fd, wakeup, Ready.R)
+ loop.add_writer(fd, wakeup, Ready.W)
+ await ev.wait()
+ loop.remove_reader(fd)
+ loop.remove_writer(fd)
+ else:
+ raise e.InternalError("bad poll status: %s")
+ s = gen.send(ready) # type: ignore[arg-type,assignment]
except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
from psycopg3.adapt import Dumper, Loader
from psycopg3.proto import AdaptContext, DumpFunc, DumpersMap, DumperType
-from psycopg3.proto import LoadFunc, LoadersMap, LoaderType, PQGen
+from psycopg3.proto import LoadFunc, LoadersMap, LoaderType, PQGen, PQGenConn
from psycopg3.connection import BaseConnection
from psycopg3 import pq
def get_loader(self, oid: int, format: pq.Format) -> Loader: ...
def register_builtin_c_adapters() -> None: ...
-def connect(conninfo: str) -> PQGen[pq.proto.PGconn]: ...
+def connect(conninfo: str) -> PQGenConn[pq.proto.PGconn]: ...
def execute(pgconn: pq.proto.PGconn) -> PQGen[List[pq.proto.PGresult]]: ...
# vim: set syntax=python:
cdef object WAIT_RW = Wait.RW
cdef int READY_R = Ready.R
-def connect(conninfo: str) -> PQGen[pq.proto.PGconn]:
+def connect(conninfo: str) -> PQGenConn[pq.proto.PGconn]:
"""
Generator to create a database connection without blocking.
cdef int status
cdef libpq.PGnotify *notify
+ # Start the generator by sending the connection fd, which won't change
+ # during the query process.
+ yield libpq.PQsocket(pgconn_ptr)
+
# Sending the query
while 1:
if libpq.PQflush(pgconn_ptr) == 0:
break
- status = yield libpq.PQsocket(pgconn_ptr), WAIT_RW
+ status = yield WAIT_RW
if status & READY_R:
# This call may read notifies which will be saved in the
# PGconn buffer and passed to Python later.
f"consuming input failed: {pq.error_message(pgconn)}")
continue
- wr = (libpq.PQsocket(pgconn_ptr), WAIT_R)
-
# Fetching the result
while 1:
if 1 != libpq.PQconsumeInput(pgconn_ptr):
raise pq.PQerror(
f"consuming input failed: {pq.error_message(pgconn)}")
if libpq.PQisBusy(pgconn_ptr):
- yield wr
+ yield WAIT_R
continue
# Consume notifies