From d0b28e8d1ec6fff532df38210ecff70ae0041997 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sun, 4 May 2025 18:30:34 +0200 Subject: [PATCH] refactor: split async/sync server cursor in two modules --- psycopg/psycopg/__init__.py | 3 +- psycopg/psycopg/_server_cursor.py | 154 ++++++++ psycopg/psycopg/_server_cursor_async.py | 144 ++++++++ psycopg/psycopg/_server_cursor_base.py | 189 ++++++++++ psycopg/psycopg/connection.py | 2 +- psycopg/psycopg/connection_async.py | 2 +- psycopg/psycopg/raw_cursor.py | 3 +- psycopg/psycopg/server_cursor.py | 455 ------------------------ 8 files changed, 493 insertions(+), 459 deletions(-) create mode 100644 psycopg/psycopg/_server_cursor.py create mode 100644 psycopg/psycopg/_server_cursor_async.py create mode 100644 psycopg/psycopg/_server_cursor_base.py delete mode 100644 psycopg/psycopg/server_cursor.py diff --git a/psycopg/psycopg/__init__.py b/psycopg/psycopg/__init__.py index 581d9b23b..10f6f59e9 100644 --- a/psycopg/psycopg/__init__.py +++ b/psycopg/psycopg/__init__.py @@ -26,10 +26,11 @@ from .transaction import AsyncTransaction, Rollback, Transaction from .cursor_async import AsyncCursor from ._capabilities import Capabilities, capabilities from .client_cursor import AsyncClientCursor, ClientCursor -from .server_cursor import AsyncServerCursor, ServerCursor +from ._server_cursor import ServerCursor from ._connection_base import BaseConnection, Notify from ._connection_info import ConnectionInfo from .connection_async import AsyncConnection +from ._server_cursor_async import AsyncServerCursor # Set the logger to a quiet default, can be enabled if needed if (logger := logging.getLogger("psycopg")).level == logging.NOTSET: diff --git a/psycopg/psycopg/_server_cursor.py b/psycopg/psycopg/_server_cursor.py new file mode 100644 index 000000000..86d0b87c8 --- /dev/null +++ b/psycopg/psycopg/_server_cursor.py @@ -0,0 +1,154 @@ +""" +psycopg server-side cursor objects. +""" + +# Copyright (C) 2020 The Psycopg Team + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, overload +from warnings import warn +from collections.abc import Iterable, Iterator + +from . import errors as e +from .abc import Params, Query +from .rows import Row, RowFactory +from .cursor import Cursor +from ._compat import Self +from ._server_cursor_base import ServerCursorMixin + +if TYPE_CHECKING: + from .connection import Connection + + +class ServerCursor(ServerCursorMixin["Connection[Any]", Row], Cursor[Row]): + __module__ = "psycopg" + __slots__ = () + + @overload + def __init__( + self, + connection: Connection[Row], + name: str, + *, + scrollable: bool | None = None, + withhold: bool = False, + ): ... + + @overload + def __init__( + self, + connection: Connection[Any], + name: str, + *, + row_factory: RowFactory[Row], + scrollable: bool | None = None, + withhold: bool = False, + ): ... + + def __init__( + self, + connection: Connection[Any], + name: str, + *, + row_factory: RowFactory[Row] | None = None, + scrollable: bool | None = None, + withhold: bool = False, + ): + Cursor.__init__( + self, connection, row_factory=row_factory or connection.row_factory + ) + ServerCursorMixin.__init__(self, name, scrollable, withhold) + + def __del__(self) -> None: + if not self.closed: + warn( + f"the server-side cursor {self} was deleted while still open." + " Please use 'with' or '.close()' to close the cursor properly", + ResourceWarning, + ) + + def close(self) -> None: + """ + Close the current cursor and free associated resources. + """ + with self._conn.lock: + if self.closed: + return + if not self._conn.closed: + self._conn.wait(self._close_gen()) + super().close() + + def execute( + self, + query: Query, + params: Params | None = None, + *, + binary: bool | None = None, + **kwargs: Any, + ) -> Self: + """ + Open a cursor to execute a query to the database. + """ + if kwargs: + raise TypeError(f"keyword not supported: {list(kwargs)[0]}") + if self._pgconn.pipeline_status: + raise e.NotSupportedError( + "server-side cursors not supported in pipeline mode" + ) + + try: + with self._conn.lock: + self._conn.wait(self._declare_gen(query, params, binary)) + except e._NO_TRACEBACK as ex: + raise ex.with_traceback(None) + + return self + + def executemany( + self, query: Query, params_seq: Iterable[Params], *, returning: bool = True + ) -> None: + """Method not implemented for server-side cursors.""" + raise e.NotSupportedError("executemany not supported on server-side cursors") + + def fetchone(self) -> Row | None: + with self._conn.lock: + recs = self._conn.wait(self._fetch_gen(1)) + if recs: + self._pos += 1 + return recs[0] + else: + return None + + def fetchmany(self, size: int = 0) -> list[Row]: + if not size: + size = self.arraysize + with self._conn.lock: + recs = self._conn.wait(self._fetch_gen(size)) + self._pos += len(recs) + return recs + + def fetchall(self) -> list[Row]: + with self._conn.lock: + recs = self._conn.wait(self._fetch_gen(None)) + self._pos += len(recs) + return recs + + def __iter__(self) -> Iterator[Row]: + while True: + with self._conn.lock: + recs = self._conn.wait(self._fetch_gen(self.itersize)) + for rec in recs: + self._pos += 1 + yield rec + if len(recs) < self.itersize: + break + + def scroll(self, value: int, mode: str = "relative") -> None: + with self._conn.lock: + self._conn.wait(self._scroll_gen(value, mode)) + # Postgres doesn't have a reliable way to report a cursor out of bound + if mode == "relative": + self._pos += value + else: + self._pos = value diff --git a/psycopg/psycopg/_server_cursor_async.py b/psycopg/psycopg/_server_cursor_async.py new file mode 100644 index 000000000..a64168e3b --- /dev/null +++ b/psycopg/psycopg/_server_cursor_async.py @@ -0,0 +1,144 @@ +""" +psycopg async server-side cursor objects. +""" + +# Copyright (C) 2020 The Psycopg Team + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, overload +from warnings import warn +from collections.abc import AsyncIterator, Iterable + +from . import errors as e +from .abc import Params, Query +from .rows import AsyncRowFactory, Row +from ._compat import Self +from .cursor_async import AsyncCursor +from ._server_cursor_base import ServerCursorMixin + +if TYPE_CHECKING: + from .connection_async import AsyncConnection + + +class AsyncServerCursor( + ServerCursorMixin["AsyncConnection[Any]", Row], AsyncCursor[Row] +): + __module__ = "psycopg" + __slots__ = () + + @overload + def __init__( + self, + connection: AsyncConnection[Row], + name: str, + *, + scrollable: bool | None = None, + withhold: bool = False, + ): ... + + @overload + def __init__( + self, + connection: AsyncConnection[Any], + name: str, + *, + row_factory: AsyncRowFactory[Row], + scrollable: bool | None = None, + withhold: bool = False, + ): ... + + def __init__( + self, + connection: AsyncConnection[Any], + name: str, + *, + row_factory: AsyncRowFactory[Row] | None = None, + scrollable: bool | None = None, + withhold: bool = False, + ): + AsyncCursor.__init__( + self, connection, row_factory=row_factory or connection.row_factory + ) + ServerCursorMixin.__init__(self, name, scrollable, withhold) + + def __del__(self) -> None: + if not self.closed: + warn( + f"the server-side cursor {self} was deleted while still open." + " Please use 'with' or '.close()' to close the cursor properly", + ResourceWarning, + ) + + async def close(self) -> None: + async with self._conn.lock: + if self.closed: + return + if not self._conn.closed: + await self._conn.wait(self._close_gen()) + await super().close() + + async def execute( + self, + query: Query, + params: Params | None = None, + *, + binary: bool | None = None, + **kwargs: Any, + ) -> Self: + if kwargs: + raise TypeError(f"keyword not supported: {list(kwargs)[0]}") + if self._pgconn.pipeline_status: + raise e.NotSupportedError( + "server-side cursors not supported in pipeline mode" + ) + + try: + async with self._conn.lock: + await self._conn.wait(self._declare_gen(query, params, binary)) + except e._NO_TRACEBACK as ex: + raise ex.with_traceback(None) + + return self + + async def executemany( + self, query: Query, params_seq: Iterable[Params], *, returning: bool = True + ) -> None: + raise e.NotSupportedError("executemany not supported on server-side cursors") + + async def fetchone(self) -> Row | None: + async with self._conn.lock: + recs = await self._conn.wait(self._fetch_gen(1)) + if recs: + self._pos += 1 + return recs[0] + else: + return None + + async def fetchmany(self, size: int = 0) -> list[Row]: + if not size: + size = self.arraysize + async with self._conn.lock: + recs = await self._conn.wait(self._fetch_gen(size)) + self._pos += len(recs) + return recs + + async def fetchall(self) -> list[Row]: + async with self._conn.lock: + recs = await self._conn.wait(self._fetch_gen(None)) + self._pos += len(recs) + return recs + + async def __aiter__(self) -> AsyncIterator[Row]: + while True: + async with self._conn.lock: + recs = await self._conn.wait(self._fetch_gen(self.itersize)) + for rec in recs: + self._pos += 1 + yield rec + if len(recs) < self.itersize: + break + + async def scroll(self, value: int, mode: str = "relative") -> None: + async with self._conn.lock: + await self._conn.wait(self._scroll_gen(value, mode)) diff --git a/psycopg/psycopg/_server_cursor_base.py b/psycopg/psycopg/_server_cursor_base.py new file mode 100644 index 000000000..4d1fba0d5 --- /dev/null +++ b/psycopg/psycopg/_server_cursor_base.py @@ -0,0 +1,189 @@ +""" +psycopg server-side cursor base objects. +""" + +# Copyright (C) 2020 The Psycopg Team + +from __future__ import annotations + +from . import errors as e +from . import pq, sql +from .abc import ConnectionType, Params, PQGen, Query +from .rows import Row +from .generators import execute +from ._cursor_base import BaseCursor + +DEFAULT_ITERSIZE = 100 + +TEXT = pq.Format.TEXT +BINARY = pq.Format.BINARY + +COMMAND_OK = pq.ExecStatus.COMMAND_OK +TUPLES_OK = pq.ExecStatus.TUPLES_OK + +IDLE = pq.TransactionStatus.IDLE +INTRANS = pq.TransactionStatus.INTRANS + + +class ServerCursorMixin(BaseCursor[ConnectionType, Row]): + """Mixin to add ServerCursor behaviour and implementation a BaseCursor.""" + + __slots__ = "_name _scrollable _withhold _described itersize _format".split() + + def __init__(self, name: str, scrollable: bool | None, withhold: bool): + self._name = name + self._scrollable = scrollable + self._withhold = withhold + self._described = False + self.itersize: int = DEFAULT_ITERSIZE + self._format = TEXT + + def __repr__(self) -> str: + # Insert the name as the second word + parts = super().__repr__().split(None, 1) + parts.insert(1, f"{self._name!r}") + return " ".join(parts) + + @property + def name(self) -> str: + """The name of the cursor.""" + return self._name + + @property + def scrollable(self) -> bool | None: + """ + Whether the cursor is scrollable or not. + + If `!None` leave the choice to the server. Use `!True` if you want to + use `scroll()` on the cursor. + """ + return self._scrollable + + @property + def withhold(self) -> bool: + """ + If the cursor can be used after the creating transaction has committed. + """ + return self._withhold + + @property + def rownumber(self) -> int | None: + """Index of the next row to fetch in the current result. + + `!None` if there is no result to fetch. + """ + res = self.pgresult + # command_status is empty if the result comes from + # describe_portal, which means that we have just executed the DECLARE, + # so we can assume we are at the first row. + tuples = res and (res.status == TUPLES_OK or res.command_status == b"") + return self._pos if tuples else None + + def _declare_gen( + self, query: Query, params: Params | None = None, binary: bool | None = None + ) -> PQGen[None]: + """Generator implementing `ServerCursor.execute()`.""" + + query = self._make_declare_statement(query) + + # If the cursor is being reused, the previous one must be closed. + if self._described: + yield from self._close_gen() + self._described = False + + yield from self._start_query(query) + pgq = self._convert_query(query, params) + self._execute_send(pgq, force_extended=True) + results = yield from execute(self._conn.pgconn) + if results[-1].status != COMMAND_OK: + self._raise_for_result(results[-1]) + + # Set the format, which will be used by describe and fetch operations + if binary is None: + self._format = self.format + else: + self._format = BINARY if binary else TEXT + + # The above result only returned COMMAND_OK. Get the cursor shape + yield from self._describe_gen() + + def _describe_gen(self) -> PQGen[None]: + self._pgconn.send_describe_portal(self._name.encode(self._encoding)) + results = yield from execute(self._pgconn) + self._check_results(results) + self._results = results + self._select_current_result(0, format=self._format) + self._described = True + + def _close_gen(self) -> PQGen[None]: + ts = self._conn.pgconn.transaction_status + + # if the connection is not in a sane state, don't even try + if ts != IDLE and ts != INTRANS: + return + + # If we are IDLE, a WITHOUT HOLD cursor will surely have gone already. + if not self._withhold and ts == IDLE: + return + + # if we didn't declare the cursor ourselves we still have to close it + # but we must make sure it exists. + if not self._described: + query = sql.SQL( + "SELECT 1 FROM pg_catalog.pg_cursors WHERE name = {}" + ).format(sql.Literal(self._name)) + res = yield from self._conn._exec_command(query) + # pipeline mode otherwise, unsupported here. + assert res is not None + if res.ntuples == 0: + return + + query = sql.SQL("CLOSE {}").format(sql.Identifier(self._name)) + yield from self._conn._exec_command(query) + + def _fetch_gen(self, num: int | None) -> PQGen[list[Row]]: + if self.closed: + raise e.InterfaceError("the cursor is closed") + # If we are stealing the cursor, make sure we know its shape + if not self._described: + yield from self._start_query() + yield from self._describe_gen() + + query = sql.SQL("FETCH FORWARD {} FROM {}").format( + sql.SQL("ALL") if num is None else sql.Literal(num), + sql.Identifier(self._name), + ) + res = yield from self._conn._exec_command(query, result_format=self._format) + # pipeline mode otherwise, unsupported here. + assert res is not None + + self.pgresult = res + self._tx.set_pgresult(res, set_loaders=False) + return self._tx.load_rows(0, res.ntuples, self._make_row) + + def _scroll_gen(self, value: int, mode: str) -> PQGen[None]: + if mode not in ("relative", "absolute"): + raise ValueError(f"bad mode: {mode}. It should be 'relative' or 'absolute'") + query = sql.SQL("MOVE{} {} FROM {}").format( + sql.SQL(" ABSOLUTE" if mode == "absolute" else ""), + sql.Literal(value), + sql.Identifier(self._name), + ) + yield from self._conn._exec_command(query) + + def _make_declare_statement(self, query: Query) -> sql.Composed: + if isinstance(query, bytes): + query = query.decode(self._encoding) + if not isinstance(query, sql.Composable): + query = sql.SQL(query) + + parts = [sql.SQL("DECLARE"), sql.Identifier(self._name)] + if self._scrollable is not None: + parts.append(sql.SQL("SCROLL" if self._scrollable else "NO SCROLL")) + parts.append(sql.SQL("CURSOR")) + if self._withhold: + parts.append(sql.SQL("WITH HOLD")) + parts.append(sql.SQL("FOR")) + parts.append(query) + + return sql.SQL(" ").join(parts) diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index bfff3fc1e..e1e4eeefc 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -31,7 +31,7 @@ from ._pipeline import Pipeline from .generators import notifies from .transaction import Transaction from ._capabilities import capabilities -from .server_cursor import ServerCursor +from ._server_cursor import ServerCursor from ._connection_base import BaseConnection, CursorRow, Notify if TYPE_CHECKING: diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index 9f4ecb464..f5e95e4aa 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -27,8 +27,8 @@ from .generators import notifies from .transaction import AsyncTransaction from .cursor_async import AsyncCursor from ._capabilities import capabilities -from .server_cursor import AsyncServerCursor from ._connection_base import BaseConnection, CursorRow, Notify +from ._server_cursor_async import AsyncServerCursor if True: # ASYNC import sys diff --git a/psycopg/psycopg/raw_cursor.py b/psycopg/psycopg/raw_cursor.py index 5f42d7013..eb9cb4d75 100644 --- a/psycopg/psycopg/raw_cursor.py +++ b/psycopg/psycopg/raw_cursor.py @@ -16,7 +16,8 @@ from .cursor import Cursor from ._queries import PostgresQuery from ._cursor_base import BaseCursor from .cursor_async import AsyncCursor -from .server_cursor import AsyncServerCursor, ServerCursor +from ._server_cursor import ServerCursor +from ._server_cursor_async import AsyncServerCursor if TYPE_CHECKING: from typing import Any # noqa: F401 diff --git a/psycopg/psycopg/server_cursor.py b/psycopg/psycopg/server_cursor.py deleted file mode 100644 index bd30d9c87..000000000 --- a/psycopg/psycopg/server_cursor.py +++ /dev/null @@ -1,455 +0,0 @@ -""" -psycopg server-side cursor objects. -""" - -# Copyright (C) 2020 The Psycopg Team - -from __future__ import annotations - -from typing import TYPE_CHECKING, Any, AsyncIterator, Iterable, Iterator, overload -from warnings import warn - -from . import errors as e -from . import pq, sql -from .abc import ConnectionType, Params, PQGen, Query -from .rows import AsyncRowFactory, Row, RowFactory -from .cursor import Cursor -from ._compat import Self -from .generators import execute -from ._cursor_base import BaseCursor -from .cursor_async import AsyncCursor - -if TYPE_CHECKING: - from .connection import Connection - from .connection_async import AsyncConnection - -DEFAULT_ITERSIZE = 100 - -TEXT = pq.Format.TEXT -BINARY = pq.Format.BINARY - -COMMAND_OK = pq.ExecStatus.COMMAND_OK -TUPLES_OK = pq.ExecStatus.TUPLES_OK - -IDLE = pq.TransactionStatus.IDLE -INTRANS = pq.TransactionStatus.INTRANS - - -class ServerCursorMixin(BaseCursor[ConnectionType, Row]): - """Mixin to add ServerCursor behaviour and implementation a BaseCursor.""" - - __slots__ = "_name _scrollable _withhold _described itersize _format".split() - - def __init__(self, name: str, scrollable: bool | None, withhold: bool): - self._name = name - self._scrollable = scrollable - self._withhold = withhold - self._described = False - self.itersize: int = DEFAULT_ITERSIZE - self._format = TEXT - - def __repr__(self) -> str: - # Insert the name as the second word - parts = super().__repr__().split(None, 1) - parts.insert(1, f"{self._name!r}") - return " ".join(parts) - - @property - def name(self) -> str: - """The name of the cursor.""" - return self._name - - @property - def scrollable(self) -> bool | None: - """ - Whether the cursor is scrollable or not. - - If `!None` leave the choice to the server. Use `!True` if you want to - use `scroll()` on the cursor. - """ - return self._scrollable - - @property - def withhold(self) -> bool: - """ - If the cursor can be used after the creating transaction has committed. - """ - return self._withhold - - @property - def rownumber(self) -> int | None: - """Index of the next row to fetch in the current result. - - `!None` if there is no result to fetch. - """ - res = self.pgresult - # command_status is empty if the result comes from - # describe_portal, which means that we have just executed the DECLARE, - # so we can assume we are at the first row. - tuples = res and (res.status == TUPLES_OK or res.command_status == b"") - return self._pos if tuples else None - - def _declare_gen( - self, query: Query, params: Params | None = None, binary: bool | None = None - ) -> PQGen[None]: - """Generator implementing `ServerCursor.execute()`.""" - - query = self._make_declare_statement(query) - - # If the cursor is being reused, the previous one must be closed. - if self._described: - yield from self._close_gen() - self._described = False - - yield from self._start_query(query) - pgq = self._convert_query(query, params) - self._execute_send(pgq, force_extended=True) - results = yield from execute(self._conn.pgconn) - if results[-1].status != COMMAND_OK: - self._raise_for_result(results[-1]) - - # Set the format, which will be used by describe and fetch operations - if binary is None: - self._format = self.format - else: - self._format = BINARY if binary else TEXT - - # The above result only returned COMMAND_OK. Get the cursor shape - yield from self._describe_gen() - - def _describe_gen(self) -> PQGen[None]: - self._pgconn.send_describe_portal(self._name.encode(self._encoding)) - results = yield from execute(self._pgconn) - self._check_results(results) - self._results = results - self._select_current_result(0, format=self._format) - self._described = True - - def _close_gen(self) -> PQGen[None]: - ts = self._conn.pgconn.transaction_status - - # if the connection is not in a sane state, don't even try - if ts != IDLE and ts != INTRANS: - return - - # If we are IDLE, a WITHOUT HOLD cursor will surely have gone already. - if not self._withhold and ts == IDLE: - return - - # if we didn't declare the cursor ourselves we still have to close it - # but we must make sure it exists. - if not self._described: - query = sql.SQL( - "SELECT 1 FROM pg_catalog.pg_cursors WHERE name = {}" - ).format(sql.Literal(self._name)) - res = yield from self._conn._exec_command(query) - # pipeline mode otherwise, unsupported here. - assert res is not None - if res.ntuples == 0: - return - - query = sql.SQL("CLOSE {}").format(sql.Identifier(self._name)) - yield from self._conn._exec_command(query) - - def _fetch_gen(self, num: int | None) -> PQGen[list[Row]]: - if self.closed: - raise e.InterfaceError("the cursor is closed") - # If we are stealing the cursor, make sure we know its shape - if not self._described: - yield from self._start_query() - yield from self._describe_gen() - - query = sql.SQL("FETCH FORWARD {} FROM {}").format( - sql.SQL("ALL") if num is None else sql.Literal(num), - sql.Identifier(self._name), - ) - res = yield from self._conn._exec_command(query, result_format=self._format) - # pipeline mode otherwise, unsupported here. - assert res is not None - - self.pgresult = res - self._tx.set_pgresult(res, set_loaders=False) - return self._tx.load_rows(0, res.ntuples, self._make_row) - - def _scroll_gen(self, value: int, mode: str) -> PQGen[None]: - if mode not in ("relative", "absolute"): - raise ValueError(f"bad mode: {mode}. It should be 'relative' or 'absolute'") - query = sql.SQL("MOVE{} {} FROM {}").format( - sql.SQL(" ABSOLUTE" if mode == "absolute" else ""), - sql.Literal(value), - sql.Identifier(self._name), - ) - yield from self._conn._exec_command(query) - - def _make_declare_statement(self, query: Query) -> sql.Composed: - if isinstance(query, bytes): - query = query.decode(self._encoding) - if not isinstance(query, sql.Composable): - query = sql.SQL(query) - - parts = [sql.SQL("DECLARE"), sql.Identifier(self._name)] - if self._scrollable is not None: - parts.append(sql.SQL("SCROLL" if self._scrollable else "NO SCROLL")) - parts.append(sql.SQL("CURSOR")) - if self._withhold: - parts.append(sql.SQL("WITH HOLD")) - parts.append(sql.SQL("FOR")) - parts.append(query) - - return sql.SQL(" ").join(parts) - - -class ServerCursor(ServerCursorMixin["Connection[Any]", Row], Cursor[Row]): - __module__ = "psycopg" - __slots__ = () - - @overload - def __init__( - self, - connection: Connection[Row], - name: str, - *, - scrollable: bool | None = None, - withhold: bool = False, - ): ... - - @overload - def __init__( - self, - connection: Connection[Any], - name: str, - *, - row_factory: RowFactory[Row], - scrollable: bool | None = None, - withhold: bool = False, - ): ... - - def __init__( - self, - connection: Connection[Any], - name: str, - *, - row_factory: RowFactory[Row] | None = None, - scrollable: bool | None = None, - withhold: bool = False, - ): - Cursor.__init__( - self, connection, row_factory=row_factory or connection.row_factory - ) - ServerCursorMixin.__init__(self, name, scrollable, withhold) - - def __del__(self) -> None: - if not self.closed: - warn( - f"the server-side cursor {self} was deleted while still open." - " Please use 'with' or '.close()' to close the cursor properly", - ResourceWarning, - ) - - def close(self) -> None: - """ - Close the current cursor and free associated resources. - """ - with self._conn.lock: - if self.closed: - return - if not self._conn.closed: - self._conn.wait(self._close_gen()) - super().close() - - def execute( - self, - query: Query, - params: Params | None = None, - *, - binary: bool | None = None, - **kwargs: Any, - ) -> Self: - """ - Open a cursor to execute a query to the database. - """ - if kwargs: - raise TypeError(f"keyword not supported: {list(kwargs)[0]}") - if self._pgconn.pipeline_status: - raise e.NotSupportedError( - "server-side cursors not supported in pipeline mode" - ) - - try: - with self._conn.lock: - self._conn.wait(self._declare_gen(query, params, binary)) - except e._NO_TRACEBACK as ex: - raise ex.with_traceback(None) - - return self - - def executemany( - self, query: Query, params_seq: Iterable[Params], *, returning: bool = True - ) -> None: - """Method not implemented for server-side cursors.""" - raise e.NotSupportedError("executemany not supported on server-side cursors") - - def fetchone(self) -> Row | None: - with self._conn.lock: - recs = self._conn.wait(self._fetch_gen(1)) - if recs: - self._pos += 1 - return recs[0] - else: - return None - - def fetchmany(self, size: int = 0) -> list[Row]: - if not size: - size = self.arraysize - with self._conn.lock: - recs = self._conn.wait(self._fetch_gen(size)) - self._pos += len(recs) - return recs - - def fetchall(self) -> list[Row]: - with self._conn.lock: - recs = self._conn.wait(self._fetch_gen(None)) - self._pos += len(recs) - return recs - - def __iter__(self) -> Iterator[Row]: - while True: - with self._conn.lock: - recs = self._conn.wait(self._fetch_gen(self.itersize)) - for rec in recs: - self._pos += 1 - yield rec - if len(recs) < self.itersize: - break - - def scroll(self, value: int, mode: str = "relative") -> None: - with self._conn.lock: - self._conn.wait(self._scroll_gen(value, mode)) - # Postgres doesn't have a reliable way to report a cursor out of bound - if mode == "relative": - self._pos += value - else: - self._pos = value - - -class AsyncServerCursor( - ServerCursorMixin["AsyncConnection[Any]", Row], AsyncCursor[Row] -): - __module__ = "psycopg" - __slots__ = () - - @overload - def __init__( - self, - connection: AsyncConnection[Row], - name: str, - *, - scrollable: bool | None = None, - withhold: bool = False, - ): ... - - @overload - def __init__( - self, - connection: AsyncConnection[Any], - name: str, - *, - row_factory: AsyncRowFactory[Row], - scrollable: bool | None = None, - withhold: bool = False, - ): ... - - def __init__( - self, - connection: AsyncConnection[Any], - name: str, - *, - row_factory: AsyncRowFactory[Row] | None = None, - scrollable: bool | None = None, - withhold: bool = False, - ): - AsyncCursor.__init__( - self, connection, row_factory=row_factory or connection.row_factory - ) - ServerCursorMixin.__init__(self, name, scrollable, withhold) - - def __del__(self) -> None: - if not self.closed: - warn( - f"the server-side cursor {self} was deleted while still open." - " Please use 'with' or '.close()' to close the cursor properly", - ResourceWarning, - ) - - async def close(self) -> None: - async with self._conn.lock: - if self.closed: - return - if not self._conn.closed: - await self._conn.wait(self._close_gen()) - await super().close() - - async def execute( - self, - query: Query, - params: Params | None = None, - *, - binary: bool | None = None, - **kwargs: Any, - ) -> Self: - if kwargs: - raise TypeError(f"keyword not supported: {list(kwargs)[0]}") - if self._pgconn.pipeline_status: - raise e.NotSupportedError( - "server-side cursors not supported in pipeline mode" - ) - - try: - async with self._conn.lock: - await self._conn.wait(self._declare_gen(query, params, binary)) - except e._NO_TRACEBACK as ex: - raise ex.with_traceback(None) - - return self - - async def executemany( - self, query: Query, params_seq: Iterable[Params], *, returning: bool = True - ) -> None: - raise e.NotSupportedError("executemany not supported on server-side cursors") - - async def fetchone(self) -> Row | None: - async with self._conn.lock: - recs = await self._conn.wait(self._fetch_gen(1)) - if recs: - self._pos += 1 - return recs[0] - else: - return None - - async def fetchmany(self, size: int = 0) -> list[Row]: - if not size: - size = self.arraysize - async with self._conn.lock: - recs = await self._conn.wait(self._fetch_gen(size)) - self._pos += len(recs) - return recs - - async def fetchall(self) -> list[Row]: - async with self._conn.lock: - recs = await self._conn.wait(self._fetch_gen(None)) - self._pos += len(recs) - return recs - - async def __aiter__(self) -> AsyncIterator[Row]: - while True: - async with self._conn.lock: - recs = await self._conn.wait(self._fetch_gen(self.itersize)) - for rec in recs: - self._pos += 1 - yield rec - if len(recs) < self.itersize: - break - - async def scroll(self, value: int, mode: str = "relative") -> None: - async with self._conn.lock: - await self._conn.wait(self._scroll_gen(value, mode)) -- 2.47.2