.. __: https://materialize.com/docs/sql/subscribe/
.. __: https://www.cockroachlabs.com/docs/stable/changefeed-for.html
- The parameters are the same of `execute()`.
+ The parameters are the same of `execute()`, except for `size` which
+ can be used to set results retrieval by chunks instead of row-by-row.
+
+ .. note::
+
+ This `size` parameter is only available from libpq 17, you can use
+ the `~Capabilities.has_stream_chunked` capability to check if this
+ is supported.
.. warning::
The `!cancel_safe()` method is implemented anyway, but it will use
the legacy :pq:`PQcancel` implementation.
+ .. automethod:: has_stream_chunked
.. automethod:: has_pgbouncer_prepared
.. seealso:: :ref:`pgbouncer`
termination (:ticket:`#754`).
- Add support for libpq function to retrieve results in chunks introduced in
libpq v17 (:ticket:`#793`).
+- Add a `size` parameter to `~Cursor.stream()` to enable results retrieval by
+ chunks instead of row-by-row (:ticket:`#794`).
- Add support for libpq function to change role passwords introduced in
libpq v17 (:ticket:`#818`).
"""
return self._has_feature("Connection.cancel_safe()", 170000, check=check)
+ def has_stream_chunked(self, check: bool = False) -> bool:
+ """Check if `Cursor.stream()` can handle a `size` parameter value
+ greater than 1 to retrieve results by chunks.
+
+ The feature requires libpq 17.0 and greater.
+ """
+ return self._has_feature(
+ "Cursor.stream() with 'size' parameter greater than 1", 170000, check=check
+ )
+
def has_pgbouncer_prepared(self, check: bool = False) -> bool:
"""Check if prepared statements in PgBouncer are supported.
from . import errors as e
from .abc import ConnectionType, Query, Params, PQGen
from .rows import Row, RowMaker
+from ._capabilities import capabilities
from ._column import Column
from .pq.misc import connection_summary
from ._queries import PostgresQuery, PostgresClientQuery
COPY_BOTH = pq.ExecStatus.COPY_BOTH
FATAL_ERROR = pq.ExecStatus.FATAL_ERROR
SINGLE_TUPLE = pq.ExecStatus.SINGLE_TUPLE
+TUPLES_CHUNK = pq.ExecStatus.TUPLES_CHUNK
PIPELINE_ABORTED = pq.ExecStatus.PIPELINE_ABORTED
ACTIVE = pq.TransactionStatus.ACTIVE
# the query said we got tuples (mostly to handle the super useful
# query "SELECT ;"
if res and (
- res.nfields or res.status == TUPLES_OK or res.status == SINGLE_TUPLE
+ res.nfields
+ or res.status == TUPLES_OK
+ or res.status == SINGLE_TUPLE
+ or res.status == TUPLES_CHUNK
):
return [Column(self, i) for i in range(res.nfields)]
else:
params: Params | None = None,
*,
binary: bool | None = None,
+ size: int,
) -> PQGen[None]:
"""Generator to send the query for `Cursor.stream()`."""
yield from self._start_query(query)
pgq = self._convert_query(query, params)
self._execute_send(pgq, binary=binary, force_extended=True)
- self._pgconn.set_single_row_mode()
+ if size < 1:
+ raise ValueError("size must be >= 1")
+ elif size == 1:
+ self._pgconn.set_single_row_mode()
+ else:
+ capabilities.has_stream_chunked(check=True)
+ self._pgconn.set_chunked_rows_mode(size)
self._last_query = query
yield from send(self._pgconn)
return None
status = res.status
- if status == SINGLE_TUPLE:
+ if status == SINGLE_TUPLE or status == TUPLES_CHUNK:
self.pgresult = res
self._tx.set_pgresult(res, set_loaders=first)
if first:
raise ex.with_traceback(None)
def stream(
- self, query: Query, params: Params | None = None, *, binary: bool | None = None
+ self,
+ query: Query,
+ params: Params | None = None,
+ *,
+ binary: bool | None = None,
+ size: int = 1,
) -> Iterator[Row]:
"""
Iterate row-by-row on a result from the database.
+
+ :param size: if greater than 1, results will be retrieved by chunks of
+ this size from the server (but still yielded row-by-row); this is only
+ available from version 17 of the libpq.
"""
if self._pgconn.pipeline_status:
raise e.ProgrammingError("stream() cannot be used in pipeline mode")
with self._conn.lock:
try:
- self._conn.wait(self._stream_send_gen(query, params, binary=binary))
+ self._conn.wait(
+ self._stream_send_gen(query, params, binary=binary, size=size)
+ )
first = True
while self._conn.wait(self._stream_fetchone_gen(first)):
- # We know that, if we got a result, it has a single row.
- rec: Row = self._tx.load_row(0, self._make_row) # type: ignore
- yield rec
+ for pos in range(size):
+ rec = self._tx.load_row(pos, self._make_row)
+ if rec is None:
+ break
+ yield rec
first = False
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
raise ex.with_traceback(None)
async def stream(
- self, query: Query, params: Params | None = None, *, binary: bool | None = None
+ self,
+ query: Query,
+ params: Params | None = None,
+ *,
+ binary: bool | None = None,
+ size: int = 1,
) -> AsyncIterator[Row]:
"""
Iterate row-by-row on a result from the database.
+
+ :param size: if greater than 1, results will be retrieved by chunks of
+ this size from the server (but still yielded row-by-row); this is only
+ available from version 17 of the libpq.
"""
if self._pgconn.pipeline_status:
raise e.ProgrammingError("stream() cannot be used in pipeline mode")
async with self._conn.lock:
try:
await self._conn.wait(
- self._stream_send_gen(query, params, binary=binary)
+ self._stream_send_gen(query, params, binary=binary, size=size)
)
first = True
while await self._conn.wait(self._stream_fetchone_gen(first)):
- # We know that, if we got a result, it has a single row.
- rec: Row = self._tx.load_row(0, self._make_row) # type: ignore
- yield rec
+ for pos in range(size):
+ rec = self._tx.load_row(pos, self._make_row)
+ if rec is None:
+ break
+ yield rec
first = False
except e._NO_TRACEBACK as ex:
COMMAND_OK = pq.ExecStatus.COMMAND_OK
TUPLES_OK = pq.ExecStatus.TUPLES_OK
SINGLE_TUPLE = pq.ExecStatus.SINGLE_TUPLE
+TUPLES_CHUNK = pq.ExecStatus.TUPLES_CHUNK
T = TypeVar("T", covariant=True)
if (
res.status == TUPLES_OK
or res.status == SINGLE_TUPLE
+ or res.status == TUPLES_CHUNK
# "describe" in named cursors
or (res.status == COMMAND_OK and nfields)
):
("has_pipeline", "Connection.pipeline()", 14),
("has_set_trace_flags", "PGconn.set_trace_flags()", 14),
("has_cancel_safe", "Connection.cancel_safe()", 17),
+ ("has_stream_chunked", "Cursor.stream() with 'size' parameter greater than 1", 17),
("has_pgbouncer_prepared", "PgBouncer prepared statements compatibility", 17),
]
assert recs == []
+def test_stream_chunked_invalid_size(conn):
+ cur = conn.cursor()
+ with pytest.raises(ValueError, match="size must be >= 1"):
+ next(cur.stream("select 1", size=0))
+
+
+@pytest.mark.libpq("< 17")
+def test_stream_chunked_not_supported(conn):
+ cur = conn.cursor()
+ with pytest.raises(psycopg.NotSupportedError):
+ next(cur.stream("select generate_series(1, 4)", size=2))
+
+
+@pytest.mark.libpq(">= 17")
+def test_stream_chunked(conn):
+ cur = conn.cursor()
+ recs = list(cur.stream("select generate_series(1, 5) as a", size=2))
+ assert recs == [(1,), (2,), (3,), (4,), (5,)]
+
+
+@pytest.mark.libpq(">= 17")
+def test_stream_chunked_row_factory(conn):
+ cur = conn.cursor(row_factory=rows.scalar_row)
+ it = cur.stream("select generate_series(1, 5) as a", size=2)
+ for i in range(1, 6):
+ assert next(it) == i
+ assert [c.name for c in cur.description] == ["a"]
+
+
@pytest.mark.crdb_skip("no col query")
def test_stream_no_col(conn):
cur = conn.cursor()
assert recs == []
+async def test_stream_chunked_invalid_size(aconn):
+ cur = aconn.cursor()
+ with pytest.raises(ValueError, match=r"size must be >= 1"):
+ await anext(cur.stream("select 1", size=0))
+
+
+@pytest.mark.libpq("< 17")
+async def test_stream_chunked_not_supported(aconn):
+ cur = aconn.cursor()
+ with pytest.raises(psycopg.NotSupportedError):
+ await anext(cur.stream("select generate_series(1, 4)", size=2))
+
+
+@pytest.mark.libpq(">= 17")
+async def test_stream_chunked(aconn):
+ cur = aconn.cursor()
+ recs = await alist(cur.stream("select generate_series(1, 5) as a", size=2))
+ assert recs == [(1,), (2,), (3,), (4,), (5,)]
+
+
+@pytest.mark.libpq(">= 17")
+async def test_stream_chunked_row_factory(aconn):
+ cur = aconn.cursor(row_factory=rows.scalar_row)
+ it = cur.stream("select generate_series(1, 5) as a", size=2)
+ for i in range(1, 6):
+ assert await anext(it) == i
+ assert [c.name for c in cur.description] == ["a"]
+
+
@pytest.mark.crdb_skip("no col query")
async def test_stream_no_col(aconn):
cur = aconn.cursor()