See :ref:`copy` for information about :sql:`COPY`.
+ .. automethod:: stream(query, params=None) -> Iterable[Sequence[Any]]
+
+ This command is similar to execute + iter; however it supports endless
+ data streams. The feature is not available in PostgreSQL, but some
+ implementations exist: Materialize `TAIL`__ and CockroachDB
+ `CHANGEFEED`__ for instance.
+
+ The feature, and the API supporting it, are still experimental.
+ Beware... 👀
+
+ .. __: https://materialize.com/docs/sql/tail/#main
+ .. __: https://www.cockroachlabs.com/docs/stable/changefeed-for.html
+
+ The parameters are the same of `execute()`.
+
.. attribute:: format
The format of the data returned by the queries. It can be selected
import sys
from types import TracebackType
from typing import Any, AsyncIterator, Callable, Generic, Iterator, List
-from typing import Optional, Sequence, Type, TYPE_CHECKING
+from typing import Optional, NoReturn, Sequence, Type, TYPE_CHECKING
from contextlib import contextmanager
from . import pq
from . import adapt
from . import errors as e
+from . import generators
from .pq import ExecStatus, Format
from .copy import Copy, AsyncCopy
execute = _psycopg3.execute
else:
- from . import generators
-
execute = generators.execute
self._execute_results(results)
+ def _stream_send_gen(
+ self, query: Query, params: Optional[Params] = None
+ ) -> PQGen[None]:
+ """Generator to send the query for `Cursor.stream()`."""
+ yield from self._start_query(query)
+ pgq = self._convert_query(query, params)
+ self._execute_send(pgq, no_pqexec=True)
+ self._conn.pgconn.set_single_row_mode()
+ self._last_query = query
+
+ def _stream_fetchone_gen(self) -> PQGen[Optional["PGresult"]]:
+ yield from generators.send(self._conn.pgconn)
+ res = yield from generators.fetch(self._conn.pgconn)
+ if res is None:
+ return None
+
+ elif res.status == ExecStatus.SINGLE_TUPLE:
+ self.pgresult = res # will set it on the transformer too
+ # TODO: the transformer may do excessive work here: create a
+ # path that doesn't clear the loaders every time.
+ return res
+
+ elif res.status in (ExecStatus.TUPLES_OK, ExecStatus.COMMAND_OK):
+ # End of single row results
+ status = res.status
+ while res:
+ res = yield from generators.fetch(self._conn.pgconn)
+ if status != ExecStatus.TUPLES_OK:
+ raise e.ProgrammingError(
+ "the operation in stream() didn't produce a result"
+ )
+ return None
+
+ else:
+ # Errors, unexpected values
+ self._raise_from_results([res])
+ return None # TODO: shouldn't be needed
+
def _start_query(self, query: Optional[Query] = None) -> PQGen[None]:
"""Generator to start the processing of a query.
for res in results:
if res.status not in self._status_ok:
- return self._raise_from_results(results)
+ self._raise_from_results(results)
self._results = list(results)
self.pgresult = results[0]
return
- def _raise_from_results(self, results: Sequence["PGresult"]) -> None:
+ def _raise_from_results(self, results: Sequence["PGresult"]) -> NoReturn:
statuses = {res.status for res in results}
badstats = statuses.difference(self._status_ok)
if results[-1].status == ExecStatus.FATAL_ERROR:
)
elif statuses.intersection(self._status_copy):
raise e.ProgrammingError(
- "COPY cannot be used with execute(); use copy() insead"
+ "COPY cannot be used with this method; use copy() insead"
)
else:
raise e.InternalError(
with self._conn.lock:
self._conn.wait(self._executemany_gen(query, params_seq))
+ def stream(
+ self, query: Query, params: Optional[Params] = None
+ ) -> Iterator[Sequence[Any]]:
+ """
+ Iterate row-by-row on a result from the database.
+ """
+ with self._conn.lock:
+ self._conn.wait(self._stream_send_gen(query, params))
+ while self._conn.wait(self._stream_fetchone_gen()):
+ rec = self._tx.load_row(0)
+ assert rec is not None
+ yield rec
+
def fetchone(self) -> Optional[Sequence[Any]]:
"""
Return the next record from the current recordset.
async with self._conn.lock:
await self._conn.wait(self._executemany_gen(query, params_seq))
+ async def stream(
+ self, query: Query, params: Optional[Params] = None
+ ) -> AsyncIterator[Sequence[Any]]:
+ async with self._conn.lock:
+ await self._conn.wait(self._stream_send_gen(query, params))
+ while await self._conn.wait(self._stream_fetchone_gen()):
+ rec = self._tx.load_row(0)
+ assert rec is not None
+ yield rec
+
async def fetchone(self) -> Optional[Sequence[Any]]:
self._check_result()
rv = self._tx.load_row(self._pos)
or error).
"""
yield from send(pgconn)
- rv = yield from _fetch(pgconn)
+ rv = yield from fetch_many(pgconn)
return rv
similar. Flush the query and then return the result using nonblocking
functions.
- After this generator has finished you may want to cycle using `_fetch()`
+ After this generator has finished you may want to cycle using `fetch()`
to retrieve the results available.
"""
while 1:
ready = yield Wait.RW
if ready & Ready.R:
# This call may read notifies: they will be saved in the
- # PGconn buffer and passed to Python later, in `_fetch()`.
+ # PGconn buffer and passed to Python later, in `fetch()`.
pgconn.consume_input()
continue
-def _fetch(pgconn: PGconn) -> PQGen[List[PGresult]]:
+def fetch_many(pgconn: PGconn) -> PQGen[List[PGresult]]:
"""
Generator retrieving results from the database without blocking.
Return the list of results returned by the database (whether success
or error).
-
- Note that this generator doesn't yield the socket number, which must have
- been already sent in the sending part of the cycle.
"""
results: List[PGresult] = []
while 1:
- pgconn.consume_input()
- if pgconn.is_busy():
- yield Wait.R
- continue
-
- # Consume notifies
- while 1:
- n = pgconn.notifies()
- if n is None:
- break
- if pgconn.notify_handler:
- pgconn.notify_handler(n)
-
- res = pgconn.get_result()
- if res is None:
+ res = yield from fetch(pgconn)
+ if not res:
break
+
results.append(res)
if res.status in _copy_statuses:
# After entering copy mode the libpq will create a phony result
return results
+def fetch(pgconn: PGconn) -> PQGen[Optional[PGresult]]:
+ """
+ Generator retrieving a single result from the database without blocking.
+
+ The query must have already been sent to the server, so pgconn.flush() has
+ already returned 0.
+
+ Return a result from the database (whether success or error).
+ """
+ while 1:
+ pgconn.consume_input()
+ if not pgconn.is_busy():
+ break
+ yield Wait.R
+
+ # Consume notifies
+ while 1:
+ n = pgconn.notifies()
+ if not n:
+ break
+ if pgconn.notify_handler:
+ pgconn.notify_handler(n)
+
+ return pgconn.get_result()
+
+
_copy_statuses = (
ExecStatus.COPY_IN,
ExecStatus.COPY_OUT,
return data
# Retrieve the final result of copy
- (result,) = yield from _fetch(pgconn)
+ (result,) = yield from fetch_many(pgconn)
if result.status != ExecStatus.COMMAND_OK:
encoding = py_codecs.get(
pgconn.parameter_status(b"client_encoding") or "", "utf-8"
break
# Retrieve the final result of copy
- (result,) = yield from _fetch(pgconn)
+ (result,) = yield from fetch_many(pgconn)
if result.status != ExecStatus.COMMAND_OK:
encoding = py_codecs.get(
pgconn.parameter_status(b"client_encoding") or "", "utf-8"
PQflush.restype = c_int
+# 33.5. Retrieving Query Results Row-by-Row
+PQsetSingleRowMode = pq.PQsetSingleRowMode
+PQsetSingleRowMode.argtypes = [PGconn_ptr]
+PQsetSingleRowMode.restype = c_int
+
+
# 33.6. Canceling Queries in Progress
PQgetCancel = pq.PQgetCancel
def PQsetnonblocking(arg1: Optional[PGconn_struct], arg2: int) -> int: ...
def PQisnonblocking(arg1: Optional[PGconn_struct]) -> int: ...
def PQflush(arg1: Optional[PGconn_struct]) -> int: ...
+def PQsetSingleRowMode(arg1: Optional[PGconn_struct]) -> int: ...
def PQgetCancel(arg1: Optional[PGconn_struct]) -> PGcancel_struct: ...
def PQfreeCancel(arg1: Optional[PGcancel_struct]) -> None: ...
def PQputCopyData(arg1: Optional[PGconn_struct], arg2: bytes, arg3: int) -> int: ...
raise PQerror(f"flushing failed: {error_message(self)}")
return rv
+ def set_single_row_mode(self) -> None:
+ if not impl.PQsetSingleRowMode(self.pgconn_ptr):
+ raise PQerror("setting single row mode failed")
+
def get_cancel(self) -> "PGcancel":
"""
Create an object with the information needed to cancel a command.
def flush(self) -> int:
...
+ def set_single_row_mode(self) -> None:
+ ...
+
def get_cancel(self) -> "PGcancel":
...
int PQisnonblocking(const PGconn *conn)
int PQflush(PGconn *conn)
+ # 33.5. Retrieving Query Results Row-by-Row
+ int PQsetSingleRowMode(PGconn *conn)
+
# 33.6. Canceling Queries in Progress
PGcancel *PQgetCancel(PGconn *conn)
void PQfreeCancel(PGcancel *cancel)
raise PQerror(f"flushing failed: {error_message(self)}")
return rv
+ def set_single_row_mode(self) -> None:
+ if not libpq.PQsetSingleRowMode(self.pgconn_ptr):
+ raise PQerror("setting single row mode failed")
+
def get_cancel(self) -> PGcancel:
cdef libpq.PGcancel *ptr = libpq.PQgetCancel(self.pgconn_ptr)
if not ptr:
pgconn.send_query(b"select 1")
+def test_single_row_mode(pgconn):
+ pgconn.send_query(b"select generate_series(1,2)")
+ pgconn.set_single_row_mode()
+
+ results = execute_wait(pgconn)
+ assert len(results) == 3
+
+ res = results[0]
+ assert res.status == pq.ExecStatus.SINGLE_TUPLE
+ assert res.ntuples == 1
+ assert res.get_value(0, 0) == b"1"
+
+ res = results[1]
+ assert res.status == pq.ExecStatus.SINGLE_TUPLE
+ assert res.ntuples == 1
+ assert res.get_value(0, 0) == b"2"
+
+ res = results[2]
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.ntuples == 0
+
+
def test_send_query_params(pgconn):
pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"])
(res,) = execute_wait(pgconn)
pgconn.ssl_in_use
+def test_set_single_row_mode(pgconn):
+ with pytest.raises(pq.PQerror):
+ pgconn.set_single_row_mode()
+
+ pgconn.send_query(b"select 1")
+ pgconn.set_single_row_mode()
+
+
def test_cancel(pgconn):
cancel = pgconn.get_cancel()
cancel.cancel()
import gc
import pickle
import weakref
+import datetime as dt
import pytest
import psycopg3
+from psycopg3 import sql
from psycopg3.oids import builtins
from psycopg3.adapt import Format
# assert cur.params == [b"x"]
+def test_stream(conn):
+ cur = conn.cursor()
+ recs = []
+ for rec in cur.stream(
+ "select i, '2021-01-01'::date + i from generate_series(1, %s) as i",
+ [2],
+ ):
+ recs.append(rec)
+
+ assert recs == [(1, dt.date(2021, 1, 2)), (2, dt.date(2021, 1, 3))]
+
+
+def test_stream_sql(conn):
+ cur = conn.cursor()
+ recs = list(
+ cur.stream(
+ sql.SQL(
+ "select i, '2021-01-01'::date + i from generate_series(1, {}) as i"
+ ).format(2)
+ )
+ )
+
+ assert recs == [(1, dt.date(2021, 1, 2)), (2, dt.date(2021, 1, 3))]
+
+
+@pytest.mark.parametrize(
+ "query",
+ [
+ "create table test_stream_badq ()",
+ "copy (select 1) to stdout",
+ "wat?",
+ ],
+)
+def test_stream_badquery(conn, query):
+ cur = conn.cursor()
+ with pytest.raises(psycopg3.ProgrammingError):
+ for rec in cur.stream(query):
+ pass
+
+
class TestColumn:
def test_description_attribs(self, conn):
curs = conn.cursor()
import gc
import pytest
import weakref
+import datetime as dt
import psycopg3
+from psycopg3 import sql
from psycopg3.adapt import Format
pytestmark = pytest.mark.asyncio
# assert cur.params == [b"x"]
+async def test_stream(aconn):
+ cur = await aconn.cursor()
+ recs = []
+ async for rec in cur.stream(
+ "select i, '2021-01-01'::date + i from generate_series(1, %s) as i",
+ [2],
+ ):
+ recs.append(rec)
+
+ assert recs == [(1, dt.date(2021, 1, 2)), (2, dt.date(2021, 1, 3))]
+
+
+async def test_stream_sql(aconn):
+ cur = await aconn.cursor()
+ recs = []
+ async for rec in cur.stream(
+ sql.SQL(
+ "select i, '2021-01-01'::date + i from generate_series(1, {}) as i"
+ ).format(2)
+ ):
+ recs.append(rec)
+
+ assert recs == [(1, dt.date(2021, 1, 2)), (2, dt.date(2021, 1, 3))]
+
+
+@pytest.mark.parametrize(
+ "query",
+ [
+ "create table test_stream_badq ()",
+ "copy (select 1) to stdout",
+ "wat?",
+ ],
+)
+async def test_stream_badquery(aconn, query):
+ cur = await aconn.cursor()
+ with pytest.raises(psycopg3.ProgrammingError):
+ async for rec in cur.stream(query):
+ pass
+
+
async def test_str(aconn):
cur = await aconn.cursor()
assert "[IDLE]" in str(cur)