from typing import Any, Iterable, List, Optional, Sequence, Tuple
from .proto import AdaptContext, DumpFunc, DumpersMap, DumperType
-from .proto import LoadFunc, LoadersMap, LoaderType, MaybeOid
+from .proto import LoadFunc, LoadersMap, LoaderType, MaybeOid, PQGen
from .connection import BaseConnection
from . import pq
def lookup_loader(self, oid: int, format: Format) -> LoaderType: ...
def register_builtin_c_loaders() -> None: ...
+def connect(conninfo: str) -> PQGen[pq.proto.PGconn]: ...
+def execute(pgconn: pq.proto.PGconn) -> PQGen[List[pq.proto.PGresult]]: ...
# vim: set syntax=python:
include "types/numeric.pyx"
include "types/text.pyx"
+include "generators.pyx"
include "adapt.pyx"
include "transform.pyx"
import logging
import asyncio
import threading
-from typing import Any, Optional, Type
-from typing import cast, TYPE_CHECKING
+from typing import Any, Callable, List, Optional, Type, cast
from . import pq
from . import errors as e
from . import cursor
-from . import generators
from . import proto
from .conninfo import make_conninfo
from .waiting import wait, wait_async
logger = logging.getLogger(__name__)
-if TYPE_CHECKING:
- from .proto import PQGen, RV
+
+connect: Callable[[str], proto.PQGen[pq.proto.PGconn]]
+execute: Callable[[pq.proto.PGconn], proto.PQGen[List[pq.proto.PGresult]]]
+
+if pq.__impl__ == "c":
+ from . import _psycopg3
+
+ connect = _psycopg3.connect
+ execute = _psycopg3.execute
+
+else:
+ from . import generators
+
+ connect = generators.connect
+ execute = generators.execute
class BaseConnection:
if conninfo is None and not kwargs:
raise TypeError("missing conninfo and not parameters specified")
conninfo = make_conninfo(conninfo or "", **kwargs)
- gen = generators.connect(conninfo)
+ gen = connect(conninfo)
pgconn = cls.wait(gen)
return cls(pgconn)
return
self.pgconn.send_query(command)
- (pgres,) = self.wait(generators.execute(self.pgconn))
+ (pgres,) = self.wait(execute(self.pgconn))
if pgres.status != pq.ExecStatus.COMMAND_OK:
raise e.OperationalError(
f"error on {command.decode('utf8')}:"
)
@classmethod
- def wait(cls, gen: "PQGen[RV]", timeout: Optional[float] = 0.1) -> "RV":
+ def wait(
+ cls, gen: proto.PQGen[proto.RV], timeout: Optional[float] = 0.1
+ ) -> proto.RV:
return wait(gen, timeout=timeout)
def set_client_encoding(self, value: str) -> None:
b"select set_config('client_encoding', $1, false)",
[value.encode("ascii")],
)
- gen = generators.execute(self.pgconn)
+ gen = execute(self.pgconn)
(result,) = self.wait(gen)
if result.status != pq.ExecStatus.TUPLES_OK:
raise e.error_from_result(result)
if conninfo is None and not kwargs:
raise TypeError("missing conninfo and not parameters specified")
conninfo = make_conninfo(conninfo or "", **kwargs)
- gen = generators.connect(conninfo)
+ gen = connect(conninfo)
pgconn = await cls.wait(gen)
return cls(pgconn)
return
self.pgconn.send_query(command)
- (pgres,) = await self.wait(generators.execute(self.pgconn))
+ (pgres,) = await self.wait(execute(self.pgconn))
if pgres.status != pq.ExecStatus.COMMAND_OK:
raise e.OperationalError(
f"error on {command.decode('utf8')}:"
)
@classmethod
- async def wait(cls, gen: "PQGen[RV]") -> "RV":
+ async def wait(cls, gen: proto.PQGen[proto.RV]) -> proto.RV:
return await wait_async(gen)
async def set_client_encoding(self, value: str) -> None:
b"select set_config('client_encoding', $1, false)",
[value.encode("ascii")],
)
- gen = generators.execute(self.pgconn)
+ gen = execute(self.pgconn)
(result,) = await self.wait(gen)
if result.status != pq.ExecStatus.TUPLES_OK:
raise e.error_from_result(result)
import codecs
from operator import attrgetter
-from typing import Any, List, Optional, Sequence, TYPE_CHECKING
+from typing import Any, Callable, List, Optional, Sequence, TYPE_CHECKING
from . import errors as e
from . import pq
-from . import generators
from . import proto
-from .proto import Query, Params, DumpersMap, LoadersMap
+from .proto import Query, Params, DumpersMap, LoadersMap, PQGen
from .utils.queries import PostgresQuery
if TYPE_CHECKING:
from .connection import BaseConnection, Connection, AsyncConnection
+execute: Callable[[pq.proto.PGconn], PQGen[List[pq.proto.PGresult]]]
+
+if pq.__impl__ == "c":
+ from . import _psycopg3
+
+ execute = _psycopg3.execute
+
+else:
+ from . import generators
+
+ execute = generators.execute
+
class Column(Sequence[Any]):
def __init__(
with self.connection.lock:
self._start_query()
self._execute_send(query, vars)
- gen = generators.execute(self.connection.pgconn)
+ gen = execute(self.connection.pgconn)
results = self.connection.wait(gen)
self._execute_results(results)
return self
for i, vars in enumerate(vars_seq):
if i == 0:
pgq = self._send_prepare(b"", query, vars)
- gen = generators.execute(self.connection.pgconn)
+ gen = execute(self.connection.pgconn)
(result,) = self.connection.wait(gen)
if result.status == self.ExecStatus.FATAL_ERROR:
raise e.error_from_result(result)
pgq.dump(vars)
self._send_query_prepared(b"", pgq)
- gen = generators.execute(self.connection.pgconn)
+ gen = execute(self.connection.pgconn)
(result,) = self.connection.wait(gen)
self._execute_results((result,))
async with self.connection.lock:
self._start_query()
self._execute_send(query, vars)
- gen = generators.execute(self.connection.pgconn)
+ gen = execute(self.connection.pgconn)
results = await self.connection.wait(gen)
self._execute_results(results)
return self
for i, vars in enumerate(vars_seq):
if i == 0:
pgq = self._send_prepare(b"", query, vars)
- gen = generators.execute(self.connection.pgconn)
+ gen = execute(self.connection.pgconn)
(result,) = await self.connection.wait(gen)
if result.status == self.ExecStatus.FATAL_ERROR:
raise e.error_from_result(result)
pgq.dump(vars)
self._send_query_prepared(b"", pgq)
- gen = generators.execute(self.connection.pgconn)
+ gen = execute(self.connection.pgconn)
(result,) = await self.connection.wait(gen)
self._execute_results((result,))
import logging
from typing import List
-from .waiting import Wait, Ready
from . import pq
from . import errors as e
from .proto import PQGen
+from .waiting import Wait, Ready
logger = logging.getLogger(__name__)
--- /dev/null
+"""
+C implementation of generators for the communication protocols with the libpq
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+
+import logging
+from typing import List
+
+from . import errors as e
+from .proto import PQGen
+from .waiting import Wait, Ready
+from psycopg3 import pq
+from psycopg3.pq cimport libpq
+from psycopg3.pq.pq_cython cimport PGconn, PGresult
+
+cdef object WAIT_W = Wait.W
+cdef object WAIT_R = Wait.R
+cdef object WAIT_RW = Wait.RW
+cdef int READY_R = Ready.R
+
+def connect(conninfo: str) -> PQGen[pq.proto.PGconn]:
+ """
+ Generator to create a database connection without blocking.
+
+ """
+ cdef PGconn conn = PGconn.connect_start(conninfo.encode("utf8"))
+ logger.debug("connection started, status %s", conn.status.name)
+ cdef libpq.PGconn *pgconn_ptr = conn.pgconn_ptr
+ cdef int conn_status = libpq.PQstatus(pgconn_ptr)
+ cdef int poll_status
+
+ while 1:
+ if conn_status == libpq.CONNECTION_BAD:
+ raise e.OperationalError(
+ f"connection is bad: {pq.error_message(conn)}"
+ )
+
+ poll_status = libpq.PQconnectPoll(pgconn_ptr)
+ logger.debug("connection polled, status %s", conn.status.name)
+ if poll_status == libpq.PGRES_POLLING_OK:
+ break
+ elif poll_status == libpq.PGRES_POLLING_READING:
+ yield (libpq.PQsocket(pgconn_ptr), WAIT_R)
+ elif poll_status == libpq.PGRES_POLLING_WRITING:
+ yield (libpq.PQsocket(pgconn_ptr), WAIT_W)
+ elif poll_status == libpq.PGRES_POLLING_FAILED:
+ raise e.OperationalError(
+ f"connection failed: {pq.error_message(conn)}"
+ )
+ else:
+ raise e.InternalError(f"unexpected poll status: {poll_status}")
+
+ conn.nonblocking = 1
+ return conn
+
+
+def execute(PGconn pgconn) -> PQGen[List[pq.proto.PGresult]]:
+ """
+ Generator sending a query and returning results without blocking.
+
+ The query must have already been sent using `pgconn.send_query()` or
+ similar. Flush the query and then return the result using nonblocking
+ functions.
+
+ Return the list of results returned by the database (whether success
+ or error).
+ """
+ results: List[pq.proto.PGresult] = []
+ cdef libpq.PGconn *pgconn_ptr = pgconn.pgconn_ptr
+ cdef int status
+
+ # Sending the query
+ while 1:
+ if libpq.PQflush(pgconn_ptr) == 0:
+ break
+
+ status = yield libpq.PQsocket(pgconn_ptr), WAIT_RW
+ if status & READY_R:
+ if 1 != libpq.PQconsumeInput(pgconn_ptr):
+ raise pq.PQerror(
+ f"consuming input failed: {pq.error_message(pgconn)}")
+ continue
+
+ wr = (libpq.PQsocket(pgconn_ptr), WAIT_R)
+
+ # Fetching the result
+ while 1:
+ if 1 != libpq.PQconsumeInput(pgconn_ptr):
+ raise pq.PQerror(
+ f"consuming input failed: {pq.error_message(pgconn)}")
+ if libpq.PQisBusy(pgconn_ptr):
+ yield wr
+ continue
+
+ res = libpq.PQgetResult(pgconn_ptr)
+ if res is NULL:
+ break
+ results.append(PGresult._from_ptr(res))
+
+ status = libpq.PQresultStatus(res)
+ if status in (libpq.PGRES_COPY_IN, libpq.PGRES_COPY_OUT, libpq.PGRES_COPY_BOTH):
+ # After entering copy mode the libpq will create a phony result
+ # for every request so let's break the endless loop.
+ break
+
+ return results
import pytest
from select import select
import psycopg3
+from psycopg3.generators import execute
def test_send_query(pq, pgconn):
b"/* %s */ select pg_sleep(0.01); select 1 as foo;"
% (b"x" * 1_000_000)
)
- results = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ results = psycopg3.waiting.wait(execute(pgconn))
assert len(results) == 2
assert results[0].nfields == 1
def test_send_query_params(pq, pgconn):
pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"])
- (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ (res,) = psycopg3.waiting.wait(execute(pgconn))
assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"8"
def test_send_prepare(pq, pgconn):
pgconn.send_prepare(b"prep", b"select $1::int + $2::int")
- (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ (res,) = psycopg3.waiting.wait(execute(pgconn))
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.send_query_prepared(b"prep", [b"3", b"5"])
- (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ (res,) = psycopg3.waiting.wait(execute(pgconn))
assert res.get_value(0, 0) == b"8"
pgconn.finish()
def test_send_prepare_types(pq, pgconn):
pgconn.send_prepare(b"prep", b"select $1 + $2", [23, 23])
- (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ (res,) = psycopg3.waiting.wait(execute(pgconn))
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.send_query_prepared(b"prep", [b"3", b"5"])
- (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ (res,) = psycopg3.waiting.wait(execute(pgconn))
assert res.get_value(0, 0) == b"8"
def test_send_prepared_binary_in(pq, pgconn):
val = b"foo\00bar"
pgconn.send_prepare(b"", b"select length($1::bytea), length($2::bytea)")
- (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ (res,) = psycopg3.waiting.wait(execute(pgconn))
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.send_query_prepared(b"", [val, val], param_formats=[0, 1])
- (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ (res,) = psycopg3.waiting.wait(execute(pgconn))
assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"3"
assert res.get_value(0, 1) == b"7"
def test_send_prepared_binary_out(pq, pgconn, fmt, out):
val = b"foo\00bar"
pgconn.send_prepare(b"", b"select $1::bytea")
- (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ (res,) = psycopg3.waiting.wait(execute(pgconn))
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.send_query_prepared(
b"", [val], param_formats=[1], result_format=fmt
)
- (res,) = psycopg3.waiting.wait(psycopg3.generators.execute(pgconn))
+ (res,) = psycopg3.waiting.wait(execute(pgconn))
assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == out
return pgconn
yield
- monkeypatch.setattr(psycopg3.generators, "connect", fake_connect)
+ monkeypatch.setattr(psycopg3.connection, "connect", fake_connect)
loop.run_until_complete(
psycopg3.AsyncConnection.connect(testdsn, **kwargs)
)
return pgconn
yield
- monkeypatch.setattr(psycopg3.generators, "connect", fake_connect)
+ monkeypatch.setattr(psycopg3.connection, "connect", fake_connect)
with pytest.raises((TypeError, psycopg3.ProgrammingError)):
loop.run_until_complete(
psycopg3.AsyncConnection.connect(*args, **kwargs)
return pgconn
yield
- monkeypatch.setattr(psycopg3.generators, "connect", fake_connect)
+ monkeypatch.setattr(psycopg3.connection, "connect", fake_connect)
psycopg3.Connection.connect(testdsn, **kwargs)
assert conninfo_to_dict(the_conninfo) == conninfo_to_dict(want)
return pgconn
yield
- monkeypatch.setattr(psycopg3.generators, "connect", fake_connect)
+ monkeypatch.setattr(psycopg3.connection, "connect", fake_connect)
with pytest.raises((TypeError, psycopg3.ProgrammingError)):
psycopg3.Connection.connect(*args, **kwargs)
return pgconn
yield
- monkeypatch.setattr(psycopg3.generators, "connect", fake_connect)
+ monkeypatch.setattr(psycopg3.connection, "connect", fake_connect)
psycopg3.connect(testdsn, **kwargs)
assert conninfo_to_dict(the_conninfo) == conninfo_to_dict(want)
return pgconn
yield
- monkeypatch.setattr(psycopg3.generators, "connect", fake_connect)
+ monkeypatch.setattr(psycopg3.connection, "connect", fake_connect)
with pytest.raises((TypeError, psycopg3.ProgrammingError)):
psycopg3.connect(*args, **kwargs)