]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
refactor: split async/sync server cursor in two modules
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 4 May 2025 16:30:34 +0000 (18:30 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 4 May 2025 18:54:06 +0000 (20:54 +0200)
psycopg/psycopg/__init__.py
psycopg/psycopg/_server_cursor.py [new file with mode: 0644]
psycopg/psycopg/_server_cursor_async.py [new file with mode: 0644]
psycopg/psycopg/_server_cursor_base.py [new file with mode: 0644]
psycopg/psycopg/connection.py
psycopg/psycopg/connection_async.py
psycopg/psycopg/raw_cursor.py
psycopg/psycopg/server_cursor.py [deleted file]

index 581d9b23be230c15b5abb394e8301938a009560a..10f6f59e96c53d35e7865b3566d537ff291c58e8 100644 (file)
@@ -26,10 +26,11 @@ from .transaction import AsyncTransaction, Rollback, Transaction
 from .cursor_async import AsyncCursor
 from ._capabilities import Capabilities, capabilities
 from .client_cursor import AsyncClientCursor, ClientCursor
-from .server_cursor import AsyncServerCursor, ServerCursor
+from ._server_cursor import ServerCursor
 from ._connection_base import BaseConnection, Notify
 from ._connection_info import ConnectionInfo
 from .connection_async import AsyncConnection
+from ._server_cursor_async import AsyncServerCursor
 
 # Set the logger to a quiet default, can be enabled if needed
 if (logger := logging.getLogger("psycopg")).level == logging.NOTSET:
diff --git a/psycopg/psycopg/_server_cursor.py b/psycopg/psycopg/_server_cursor.py
new file mode 100644 (file)
index 0000000..86d0b87
--- /dev/null
@@ -0,0 +1,154 @@
+"""
+psycopg server-side cursor objects.
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, overload
+from warnings import warn
+from collections.abc import Iterable, Iterator
+
+from . import errors as e
+from .abc import Params, Query
+from .rows import Row, RowFactory
+from .cursor import Cursor
+from ._compat import Self
+from ._server_cursor_base import ServerCursorMixin
+
+if TYPE_CHECKING:
+    from .connection import Connection
+
+
+class ServerCursor(ServerCursorMixin["Connection[Any]", Row], Cursor[Row]):
+    __module__ = "psycopg"
+    __slots__ = ()
+
+    @overload
+    def __init__(
+        self,
+        connection: Connection[Row],
+        name: str,
+        *,
+        scrollable: bool | None = None,
+        withhold: bool = False,
+    ): ...
+
+    @overload
+    def __init__(
+        self,
+        connection: Connection[Any],
+        name: str,
+        *,
+        row_factory: RowFactory[Row],
+        scrollable: bool | None = None,
+        withhold: bool = False,
+    ): ...
+
+    def __init__(
+        self,
+        connection: Connection[Any],
+        name: str,
+        *,
+        row_factory: RowFactory[Row] | None = None,
+        scrollable: bool | None = None,
+        withhold: bool = False,
+    ):
+        Cursor.__init__(
+            self, connection, row_factory=row_factory or connection.row_factory
+        )
+        ServerCursorMixin.__init__(self, name, scrollable, withhold)
+
+    def __del__(self) -> None:
+        if not self.closed:
+            warn(
+                f"the server-side cursor {self} was deleted while still open."
+                " Please use 'with' or '.close()' to close the cursor properly",
+                ResourceWarning,
+            )
+
+    def close(self) -> None:
+        """
+        Close the current cursor and free associated resources.
+        """
+        with self._conn.lock:
+            if self.closed:
+                return
+            if not self._conn.closed:
+                self._conn.wait(self._close_gen())
+            super().close()
+
+    def execute(
+        self,
+        query: Query,
+        params: Params | None = None,
+        *,
+        binary: bool | None = None,
+        **kwargs: Any,
+    ) -> Self:
+        """
+        Open a cursor to execute a query to the database.
+        """
+        if kwargs:
+            raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
+        if self._pgconn.pipeline_status:
+            raise e.NotSupportedError(
+                "server-side cursors not supported in pipeline mode"
+            )
+
+        try:
+            with self._conn.lock:
+                self._conn.wait(self._declare_gen(query, params, binary))
+        except e._NO_TRACEBACK as ex:
+            raise ex.with_traceback(None)
+
+        return self
+
+    def executemany(
+        self, query: Query, params_seq: Iterable[Params], *, returning: bool = True
+    ) -> None:
+        """Method not implemented for server-side cursors."""
+        raise e.NotSupportedError("executemany not supported on server-side cursors")
+
+    def fetchone(self) -> Row | None:
+        with self._conn.lock:
+            recs = self._conn.wait(self._fetch_gen(1))
+        if recs:
+            self._pos += 1
+            return recs[0]
+        else:
+            return None
+
+    def fetchmany(self, size: int = 0) -> list[Row]:
+        if not size:
+            size = self.arraysize
+        with self._conn.lock:
+            recs = self._conn.wait(self._fetch_gen(size))
+        self._pos += len(recs)
+        return recs
+
+    def fetchall(self) -> list[Row]:
+        with self._conn.lock:
+            recs = self._conn.wait(self._fetch_gen(None))
+        self._pos += len(recs)
+        return recs
+
+    def __iter__(self) -> Iterator[Row]:
+        while True:
+            with self._conn.lock:
+                recs = self._conn.wait(self._fetch_gen(self.itersize))
+            for rec in recs:
+                self._pos += 1
+                yield rec
+            if len(recs) < self.itersize:
+                break
+
+    def scroll(self, value: int, mode: str = "relative") -> None:
+        with self._conn.lock:
+            self._conn.wait(self._scroll_gen(value, mode))
+        # Postgres doesn't have a reliable way to report a cursor out of bound
+        if mode == "relative":
+            self._pos += value
+        else:
+            self._pos = value
diff --git a/psycopg/psycopg/_server_cursor_async.py b/psycopg/psycopg/_server_cursor_async.py
new file mode 100644 (file)
index 0000000..a64168e
--- /dev/null
@@ -0,0 +1,144 @@
+"""
+psycopg async server-side cursor objects.
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, overload
+from warnings import warn
+from collections.abc import AsyncIterator, Iterable
+
+from . import errors as e
+from .abc import Params, Query
+from .rows import AsyncRowFactory, Row
+from ._compat import Self
+from .cursor_async import AsyncCursor
+from ._server_cursor_base import ServerCursorMixin
+
+if TYPE_CHECKING:
+    from .connection_async import AsyncConnection
+
+
+class AsyncServerCursor(
+    ServerCursorMixin["AsyncConnection[Any]", Row], AsyncCursor[Row]
+):
+    __module__ = "psycopg"
+    __slots__ = ()
+
+    @overload
+    def __init__(
+        self,
+        connection: AsyncConnection[Row],
+        name: str,
+        *,
+        scrollable: bool | None = None,
+        withhold: bool = False,
+    ): ...
+
+    @overload
+    def __init__(
+        self,
+        connection: AsyncConnection[Any],
+        name: str,
+        *,
+        row_factory: AsyncRowFactory[Row],
+        scrollable: bool | None = None,
+        withhold: bool = False,
+    ): ...
+
+    def __init__(
+        self,
+        connection: AsyncConnection[Any],
+        name: str,
+        *,
+        row_factory: AsyncRowFactory[Row] | None = None,
+        scrollable: bool | None = None,
+        withhold: bool = False,
+    ):
+        AsyncCursor.__init__(
+            self, connection, row_factory=row_factory or connection.row_factory
+        )
+        ServerCursorMixin.__init__(self, name, scrollable, withhold)
+
+    def __del__(self) -> None:
+        if not self.closed:
+            warn(
+                f"the server-side cursor {self} was deleted while still open."
+                " Please use 'with' or '.close()' to close the cursor properly",
+                ResourceWarning,
+            )
+
+    async def close(self) -> None:
+        async with self._conn.lock:
+            if self.closed:
+                return
+            if not self._conn.closed:
+                await self._conn.wait(self._close_gen())
+            await super().close()
+
+    async def execute(
+        self,
+        query: Query,
+        params: Params | None = None,
+        *,
+        binary: bool | None = None,
+        **kwargs: Any,
+    ) -> Self:
+        if kwargs:
+            raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
+        if self._pgconn.pipeline_status:
+            raise e.NotSupportedError(
+                "server-side cursors not supported in pipeline mode"
+            )
+
+        try:
+            async with self._conn.lock:
+                await self._conn.wait(self._declare_gen(query, params, binary))
+        except e._NO_TRACEBACK as ex:
+            raise ex.with_traceback(None)
+
+        return self
+
+    async def executemany(
+        self, query: Query, params_seq: Iterable[Params], *, returning: bool = True
+    ) -> None:
+        raise e.NotSupportedError("executemany not supported on server-side cursors")
+
+    async def fetchone(self) -> Row | None:
+        async with self._conn.lock:
+            recs = await self._conn.wait(self._fetch_gen(1))
+        if recs:
+            self._pos += 1
+            return recs[0]
+        else:
+            return None
+
+    async def fetchmany(self, size: int = 0) -> list[Row]:
+        if not size:
+            size = self.arraysize
+        async with self._conn.lock:
+            recs = await self._conn.wait(self._fetch_gen(size))
+        self._pos += len(recs)
+        return recs
+
+    async def fetchall(self) -> list[Row]:
+        async with self._conn.lock:
+            recs = await self._conn.wait(self._fetch_gen(None))
+        self._pos += len(recs)
+        return recs
+
+    async def __aiter__(self) -> AsyncIterator[Row]:
+        while True:
+            async with self._conn.lock:
+                recs = await self._conn.wait(self._fetch_gen(self.itersize))
+            for rec in recs:
+                self._pos += 1
+                yield rec
+            if len(recs) < self.itersize:
+                break
+
+    async def scroll(self, value: int, mode: str = "relative") -> None:
+        async with self._conn.lock:
+            await self._conn.wait(self._scroll_gen(value, mode))
diff --git a/psycopg/psycopg/_server_cursor_base.py b/psycopg/psycopg/_server_cursor_base.py
new file mode 100644 (file)
index 0000000..4d1fba0
--- /dev/null
@@ -0,0 +1,189 @@
+"""
+psycopg server-side cursor base objects.
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+
+from __future__ import annotations
+
+from . import errors as e
+from . import pq, sql
+from .abc import ConnectionType, Params, PQGen, Query
+from .rows import Row
+from .generators import execute
+from ._cursor_base import BaseCursor
+
+DEFAULT_ITERSIZE = 100
+
+TEXT = pq.Format.TEXT
+BINARY = pq.Format.BINARY
+
+COMMAND_OK = pq.ExecStatus.COMMAND_OK
+TUPLES_OK = pq.ExecStatus.TUPLES_OK
+
+IDLE = pq.TransactionStatus.IDLE
+INTRANS = pq.TransactionStatus.INTRANS
+
+
+class ServerCursorMixin(BaseCursor[ConnectionType, Row]):
+    """Mixin to add ServerCursor behaviour and implementation a BaseCursor."""
+
+    __slots__ = "_name _scrollable _withhold _described itersize _format".split()
+
+    def __init__(self, name: str, scrollable: bool | None, withhold: bool):
+        self._name = name
+        self._scrollable = scrollable
+        self._withhold = withhold
+        self._described = False
+        self.itersize: int = DEFAULT_ITERSIZE
+        self._format = TEXT
+
+    def __repr__(self) -> str:
+        # Insert the name as the second word
+        parts = super().__repr__().split(None, 1)
+        parts.insert(1, f"{self._name!r}")
+        return " ".join(parts)
+
+    @property
+    def name(self) -> str:
+        """The name of the cursor."""
+        return self._name
+
+    @property
+    def scrollable(self) -> bool | None:
+        """
+        Whether the cursor is scrollable or not.
+
+        If `!None` leave the choice to the server. Use `!True` if you want to
+        use `scroll()` on the cursor.
+        """
+        return self._scrollable
+
+    @property
+    def withhold(self) -> bool:
+        """
+        If the cursor can be used after the creating transaction has committed.
+        """
+        return self._withhold
+
+    @property
+    def rownumber(self) -> int | None:
+        """Index of the next row to fetch in the current result.
+
+        `!None` if there is no result to fetch.
+        """
+        res = self.pgresult
+        # command_status is empty if the result comes from
+        # describe_portal, which means that we have just executed the DECLARE,
+        # so we can assume we are at the first row.
+        tuples = res and (res.status == TUPLES_OK or res.command_status == b"")
+        return self._pos if tuples else None
+
+    def _declare_gen(
+        self, query: Query, params: Params | None = None, binary: bool | None = None
+    ) -> PQGen[None]:
+        """Generator implementing `ServerCursor.execute()`."""
+
+        query = self._make_declare_statement(query)
+
+        # If the cursor is being reused, the previous one must be closed.
+        if self._described:
+            yield from self._close_gen()
+            self._described = False
+
+        yield from self._start_query(query)
+        pgq = self._convert_query(query, params)
+        self._execute_send(pgq, force_extended=True)
+        results = yield from execute(self._conn.pgconn)
+        if results[-1].status != COMMAND_OK:
+            self._raise_for_result(results[-1])
+
+        # Set the format, which will be used by describe and fetch operations
+        if binary is None:
+            self._format = self.format
+        else:
+            self._format = BINARY if binary else TEXT
+
+        # The above result only returned COMMAND_OK. Get the cursor shape
+        yield from self._describe_gen()
+
+    def _describe_gen(self) -> PQGen[None]:
+        self._pgconn.send_describe_portal(self._name.encode(self._encoding))
+        results = yield from execute(self._pgconn)
+        self._check_results(results)
+        self._results = results
+        self._select_current_result(0, format=self._format)
+        self._described = True
+
+    def _close_gen(self) -> PQGen[None]:
+        ts = self._conn.pgconn.transaction_status
+
+        # if the connection is not in a sane state, don't even try
+        if ts != IDLE and ts != INTRANS:
+            return
+
+        # If we are IDLE, a WITHOUT HOLD cursor will surely have gone already.
+        if not self._withhold and ts == IDLE:
+            return
+
+        # if we didn't declare the cursor ourselves we still have to close it
+        # but we must make sure it exists.
+        if not self._described:
+            query = sql.SQL(
+                "SELECT 1 FROM pg_catalog.pg_cursors WHERE name = {}"
+            ).format(sql.Literal(self._name))
+            res = yield from self._conn._exec_command(query)
+            # pipeline mode otherwise, unsupported here.
+            assert res is not None
+            if res.ntuples == 0:
+                return
+
+        query = sql.SQL("CLOSE {}").format(sql.Identifier(self._name))
+        yield from self._conn._exec_command(query)
+
+    def _fetch_gen(self, num: int | None) -> PQGen[list[Row]]:
+        if self.closed:
+            raise e.InterfaceError("the cursor is closed")
+        # If we are stealing the cursor, make sure we know its shape
+        if not self._described:
+            yield from self._start_query()
+            yield from self._describe_gen()
+
+        query = sql.SQL("FETCH FORWARD {} FROM {}").format(
+            sql.SQL("ALL") if num is None else sql.Literal(num),
+            sql.Identifier(self._name),
+        )
+        res = yield from self._conn._exec_command(query, result_format=self._format)
+        # pipeline mode otherwise, unsupported here.
+        assert res is not None
+
+        self.pgresult = res
+        self._tx.set_pgresult(res, set_loaders=False)
+        return self._tx.load_rows(0, res.ntuples, self._make_row)
+
+    def _scroll_gen(self, value: int, mode: str) -> PQGen[None]:
+        if mode not in ("relative", "absolute"):
+            raise ValueError(f"bad mode: {mode}. It should be 'relative' or 'absolute'")
+        query = sql.SQL("MOVE{} {} FROM {}").format(
+            sql.SQL(" ABSOLUTE" if mode == "absolute" else ""),
+            sql.Literal(value),
+            sql.Identifier(self._name),
+        )
+        yield from self._conn._exec_command(query)
+
+    def _make_declare_statement(self, query: Query) -> sql.Composed:
+        if isinstance(query, bytes):
+            query = query.decode(self._encoding)
+        if not isinstance(query, sql.Composable):
+            query = sql.SQL(query)
+
+        parts = [sql.SQL("DECLARE"), sql.Identifier(self._name)]
+        if self._scrollable is not None:
+            parts.append(sql.SQL("SCROLL" if self._scrollable else "NO SCROLL"))
+        parts.append(sql.SQL("CURSOR"))
+        if self._withhold:
+            parts.append(sql.SQL("WITH HOLD"))
+        parts.append(sql.SQL("FOR"))
+        parts.append(query)
+
+        return sql.SQL(" ").join(parts)
index 8a269a6f63fed3a8110a343ba0af5c970da5422e..156ff82ff025ad402b64c7922268a1f31ef02ad5 100644 (file)
@@ -32,7 +32,7 @@ from ._pipeline import Pipeline
 from .generators import notifies
 from .transaction import Transaction
 from ._capabilities import capabilities
-from .server_cursor import ServerCursor
+from ._server_cursor import ServerCursor
 from ._connection_base import BaseConnection, CursorRow, Notify
 
 if TYPE_CHECKING:
index d8baf00c7ae1cc426c3dfb25504b3ac80d0358a8..a2397ed2f95ef851888a499a23b90c85992c1677 100644 (file)
@@ -28,8 +28,8 @@ from .generators import notifies
 from .transaction import AsyncTransaction
 from .cursor_async import AsyncCursor
 from ._capabilities import capabilities
-from .server_cursor import AsyncServerCursor
 from ._connection_base import BaseConnection, CursorRow, Notify
+from ._server_cursor_async import AsyncServerCursor
 
 if True:  # ASYNC
     import sys
index 5f42d70135dfa2c9ea728f10449b679195f7a1ed..eb9cb4d751c21900b8e0fbab4d70ed9a8617bca3 100644 (file)
@@ -16,7 +16,8 @@ from .cursor import Cursor
 from ._queries import PostgresQuery
 from ._cursor_base import BaseCursor
 from .cursor_async import AsyncCursor
-from .server_cursor import AsyncServerCursor, ServerCursor
+from ._server_cursor import ServerCursor
+from ._server_cursor_async import AsyncServerCursor
 
 if TYPE_CHECKING:
     from typing import Any  # noqa: F401
diff --git a/psycopg/psycopg/server_cursor.py b/psycopg/psycopg/server_cursor.py
deleted file mode 100644 (file)
index e991b6e..0000000
+++ /dev/null
@@ -1,456 +0,0 @@
-"""
-psycopg server-side cursor objects.
-"""
-
-# Copyright (C) 2020 The Psycopg Team
-
-from __future__ import annotations
-
-from typing import TYPE_CHECKING, Any, overload
-from warnings import warn
-from collections.abc import AsyncIterator, Iterable, Iterator
-
-from . import errors as e
-from . import pq, sql
-from .abc import ConnectionType, Params, PQGen, Query
-from .rows import AsyncRowFactory, Row, RowFactory
-from .cursor import Cursor
-from ._compat import Self
-from .generators import execute
-from ._cursor_base import BaseCursor
-from .cursor_async import AsyncCursor
-
-if TYPE_CHECKING:
-    from .connection import Connection
-    from .connection_async import AsyncConnection
-
-DEFAULT_ITERSIZE = 100
-
-TEXT = pq.Format.TEXT
-BINARY = pq.Format.BINARY
-
-COMMAND_OK = pq.ExecStatus.COMMAND_OK
-TUPLES_OK = pq.ExecStatus.TUPLES_OK
-
-IDLE = pq.TransactionStatus.IDLE
-INTRANS = pq.TransactionStatus.INTRANS
-
-
-class ServerCursorMixin(BaseCursor[ConnectionType, Row]):
-    """Mixin to add ServerCursor behaviour and implementation a BaseCursor."""
-
-    __slots__ = "_name _scrollable _withhold _described itersize _format".split()
-
-    def __init__(self, name: str, scrollable: bool | None, withhold: bool):
-        self._name = name
-        self._scrollable = scrollable
-        self._withhold = withhold
-        self._described = False
-        self.itersize: int = DEFAULT_ITERSIZE
-        self._format = TEXT
-
-    def __repr__(self) -> str:
-        # Insert the name as the second word
-        parts = super().__repr__().split(None, 1)
-        parts.insert(1, f"{self._name!r}")
-        return " ".join(parts)
-
-    @property
-    def name(self) -> str:
-        """The name of the cursor."""
-        return self._name
-
-    @property
-    def scrollable(self) -> bool | None:
-        """
-        Whether the cursor is scrollable or not.
-
-        If `!None` leave the choice to the server. Use `!True` if you want to
-        use `scroll()` on the cursor.
-        """
-        return self._scrollable
-
-    @property
-    def withhold(self) -> bool:
-        """
-        If the cursor can be used after the creating transaction has committed.
-        """
-        return self._withhold
-
-    @property
-    def rownumber(self) -> int | None:
-        """Index of the next row to fetch in the current result.
-
-        `!None` if there is no result to fetch.
-        """
-        res = self.pgresult
-        # command_status is empty if the result comes from
-        # describe_portal, which means that we have just executed the DECLARE,
-        # so we can assume we are at the first row.
-        tuples = res and (res.status == TUPLES_OK or res.command_status == b"")
-        return self._pos if tuples else None
-
-    def _declare_gen(
-        self, query: Query, params: Params | None = None, binary: bool | None = None
-    ) -> PQGen[None]:
-        """Generator implementing `ServerCursor.execute()`."""
-
-        query = self._make_declare_statement(query)
-
-        # If the cursor is being reused, the previous one must be closed.
-        if self._described:
-            yield from self._close_gen()
-            self._described = False
-
-        yield from self._start_query(query)
-        pgq = self._convert_query(query, params)
-        self._execute_send(pgq, force_extended=True)
-        results = yield from execute(self._conn.pgconn)
-        if results[-1].status != COMMAND_OK:
-            self._raise_for_result(results[-1])
-
-        # Set the format, which will be used by describe and fetch operations
-        if binary is None:
-            self._format = self.format
-        else:
-            self._format = BINARY if binary else TEXT
-
-        # The above result only returned COMMAND_OK. Get the cursor shape
-        yield from self._describe_gen()
-
-    def _describe_gen(self) -> PQGen[None]:
-        self._pgconn.send_describe_portal(self._name.encode(self._encoding))
-        results = yield from execute(self._pgconn)
-        self._check_results(results)
-        self._results = results
-        self._select_current_result(0, format=self._format)
-        self._described = True
-
-    def _close_gen(self) -> PQGen[None]:
-        ts = self._conn.pgconn.transaction_status
-
-        # if the connection is not in a sane state, don't even try
-        if ts != IDLE and ts != INTRANS:
-            return
-
-        # If we are IDLE, a WITHOUT HOLD cursor will surely have gone already.
-        if not self._withhold and ts == IDLE:
-            return
-
-        # if we didn't declare the cursor ourselves we still have to close it
-        # but we must make sure it exists.
-        if not self._described:
-            query = sql.SQL(
-                "SELECT 1 FROM pg_catalog.pg_cursors WHERE name = {}"
-            ).format(sql.Literal(self._name))
-            res = yield from self._conn._exec_command(query)
-            # pipeline mode otherwise, unsupported here.
-            assert res is not None
-            if res.ntuples == 0:
-                return
-
-        query = sql.SQL("CLOSE {}").format(sql.Identifier(self._name))
-        yield from self._conn._exec_command(query)
-
-    def _fetch_gen(self, num: int | None) -> PQGen[list[Row]]:
-        if self.closed:
-            raise e.InterfaceError("the cursor is closed")
-        # If we are stealing the cursor, make sure we know its shape
-        if not self._described:
-            yield from self._start_query()
-            yield from self._describe_gen()
-
-        query = sql.SQL("FETCH FORWARD {} FROM {}").format(
-            sql.SQL("ALL") if num is None else sql.Literal(num),
-            sql.Identifier(self._name),
-        )
-        res = yield from self._conn._exec_command(query, result_format=self._format)
-        # pipeline mode otherwise, unsupported here.
-        assert res is not None
-
-        self.pgresult = res
-        self._tx.set_pgresult(res, set_loaders=False)
-        return self._tx.load_rows(0, res.ntuples, self._make_row)
-
-    def _scroll_gen(self, value: int, mode: str) -> PQGen[None]:
-        if mode not in ("relative", "absolute"):
-            raise ValueError(f"bad mode: {mode}. It should be 'relative' or 'absolute'")
-        query = sql.SQL("MOVE{} {} FROM {}").format(
-            sql.SQL(" ABSOLUTE" if mode == "absolute" else ""),
-            sql.Literal(value),
-            sql.Identifier(self._name),
-        )
-        yield from self._conn._exec_command(query)
-
-    def _make_declare_statement(self, query: Query) -> sql.Composed:
-        if isinstance(query, bytes):
-            query = query.decode(self._encoding)
-        if not isinstance(query, sql.Composable):
-            query = sql.SQL(query)
-
-        parts = [sql.SQL("DECLARE"), sql.Identifier(self._name)]
-        if self._scrollable is not None:
-            parts.append(sql.SQL("SCROLL" if self._scrollable else "NO SCROLL"))
-        parts.append(sql.SQL("CURSOR"))
-        if self._withhold:
-            parts.append(sql.SQL("WITH HOLD"))
-        parts.append(sql.SQL("FOR"))
-        parts.append(query)
-
-        return sql.SQL(" ").join(parts)
-
-
-class ServerCursor(ServerCursorMixin["Connection[Any]", Row], Cursor[Row]):
-    __module__ = "psycopg"
-    __slots__ = ()
-
-    @overload
-    def __init__(
-        self,
-        connection: Connection[Row],
-        name: str,
-        *,
-        scrollable: bool | None = None,
-        withhold: bool = False,
-    ): ...
-
-    @overload
-    def __init__(
-        self,
-        connection: Connection[Any],
-        name: str,
-        *,
-        row_factory: RowFactory[Row],
-        scrollable: bool | None = None,
-        withhold: bool = False,
-    ): ...
-
-    def __init__(
-        self,
-        connection: Connection[Any],
-        name: str,
-        *,
-        row_factory: RowFactory[Row] | None = None,
-        scrollable: bool | None = None,
-        withhold: bool = False,
-    ):
-        Cursor.__init__(
-            self, connection, row_factory=row_factory or connection.row_factory
-        )
-        ServerCursorMixin.__init__(self, name, scrollable, withhold)
-
-    def __del__(self) -> None:
-        if not self.closed:
-            warn(
-                f"the server-side cursor {self} was deleted while still open."
-                " Please use 'with' or '.close()' to close the cursor properly",
-                ResourceWarning,
-            )
-
-    def close(self) -> None:
-        """
-        Close the current cursor and free associated resources.
-        """
-        with self._conn.lock:
-            if self.closed:
-                return
-            if not self._conn.closed:
-                self._conn.wait(self._close_gen())
-            super().close()
-
-    def execute(
-        self,
-        query: Query,
-        params: Params | None = None,
-        *,
-        binary: bool | None = None,
-        **kwargs: Any,
-    ) -> Self:
-        """
-        Open a cursor to execute a query to the database.
-        """
-        if kwargs:
-            raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
-        if self._pgconn.pipeline_status:
-            raise e.NotSupportedError(
-                "server-side cursors not supported in pipeline mode"
-            )
-
-        try:
-            with self._conn.lock:
-                self._conn.wait(self._declare_gen(query, params, binary))
-        except e._NO_TRACEBACK as ex:
-            raise ex.with_traceback(None)
-
-        return self
-
-    def executemany(
-        self, query: Query, params_seq: Iterable[Params], *, returning: bool = True
-    ) -> None:
-        """Method not implemented for server-side cursors."""
-        raise e.NotSupportedError("executemany not supported on server-side cursors")
-
-    def fetchone(self) -> Row | None:
-        with self._conn.lock:
-            recs = self._conn.wait(self._fetch_gen(1))
-        if recs:
-            self._pos += 1
-            return recs[0]
-        else:
-            return None
-
-    def fetchmany(self, size: int = 0) -> list[Row]:
-        if not size:
-            size = self.arraysize
-        with self._conn.lock:
-            recs = self._conn.wait(self._fetch_gen(size))
-        self._pos += len(recs)
-        return recs
-
-    def fetchall(self) -> list[Row]:
-        with self._conn.lock:
-            recs = self._conn.wait(self._fetch_gen(None))
-        self._pos += len(recs)
-        return recs
-
-    def __iter__(self) -> Iterator[Row]:
-        while True:
-            with self._conn.lock:
-                recs = self._conn.wait(self._fetch_gen(self.itersize))
-            for rec in recs:
-                self._pos += 1
-                yield rec
-            if len(recs) < self.itersize:
-                break
-
-    def scroll(self, value: int, mode: str = "relative") -> None:
-        with self._conn.lock:
-            self._conn.wait(self._scroll_gen(value, mode))
-        # Postgres doesn't have a reliable way to report a cursor out of bound
-        if mode == "relative":
-            self._pos += value
-        else:
-            self._pos = value
-
-
-class AsyncServerCursor(
-    ServerCursorMixin["AsyncConnection[Any]", Row], AsyncCursor[Row]
-):
-    __module__ = "psycopg"
-    __slots__ = ()
-
-    @overload
-    def __init__(
-        self,
-        connection: AsyncConnection[Row],
-        name: str,
-        *,
-        scrollable: bool | None = None,
-        withhold: bool = False,
-    ): ...
-
-    @overload
-    def __init__(
-        self,
-        connection: AsyncConnection[Any],
-        name: str,
-        *,
-        row_factory: AsyncRowFactory[Row],
-        scrollable: bool | None = None,
-        withhold: bool = False,
-    ): ...
-
-    def __init__(
-        self,
-        connection: AsyncConnection[Any],
-        name: str,
-        *,
-        row_factory: AsyncRowFactory[Row] | None = None,
-        scrollable: bool | None = None,
-        withhold: bool = False,
-    ):
-        AsyncCursor.__init__(
-            self, connection, row_factory=row_factory or connection.row_factory
-        )
-        ServerCursorMixin.__init__(self, name, scrollable, withhold)
-
-    def __del__(self) -> None:
-        if not self.closed:
-            warn(
-                f"the server-side cursor {self} was deleted while still open."
-                " Please use 'with' or '.close()' to close the cursor properly",
-                ResourceWarning,
-            )
-
-    async def close(self) -> None:
-        async with self._conn.lock:
-            if self.closed:
-                return
-            if not self._conn.closed:
-                await self._conn.wait(self._close_gen())
-            await super().close()
-
-    async def execute(
-        self,
-        query: Query,
-        params: Params | None = None,
-        *,
-        binary: bool | None = None,
-        **kwargs: Any,
-    ) -> Self:
-        if kwargs:
-            raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
-        if self._pgconn.pipeline_status:
-            raise e.NotSupportedError(
-                "server-side cursors not supported in pipeline mode"
-            )
-
-        try:
-            async with self._conn.lock:
-                await self._conn.wait(self._declare_gen(query, params, binary))
-        except e._NO_TRACEBACK as ex:
-            raise ex.with_traceback(None)
-
-        return self
-
-    async def executemany(
-        self, query: Query, params_seq: Iterable[Params], *, returning: bool = True
-    ) -> None:
-        raise e.NotSupportedError("executemany not supported on server-side cursors")
-
-    async def fetchone(self) -> Row | None:
-        async with self._conn.lock:
-            recs = await self._conn.wait(self._fetch_gen(1))
-        if recs:
-            self._pos += 1
-            return recs[0]
-        else:
-            return None
-
-    async def fetchmany(self, size: int = 0) -> list[Row]:
-        if not size:
-            size = self.arraysize
-        async with self._conn.lock:
-            recs = await self._conn.wait(self._fetch_gen(size))
-        self._pos += len(recs)
-        return recs
-
-    async def fetchall(self) -> list[Row]:
-        async with self._conn.lock:
-            recs = await self._conn.wait(self._fetch_gen(None))
-        self._pos += len(recs)
-        return recs
-
-    async def __aiter__(self) -> AsyncIterator[Row]:
-        while True:
-            async with self._conn.lock:
-                recs = await self._conn.wait(self._fetch_gen(self.itersize))
-            for rec in recs:
-                self._pos += 1
-                yield rec
-            if len(recs) < self.itersize:
-                break
-
-    async def scroll(self, value: int, mode: str = "relative") -> None:
-        async with self._conn.lock:
-            await self._conn.wait(self._scroll_gen(value, mode))