]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Move the AsyncCursor implementation in its own module
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 13 Aug 2021 09:55:59 +0000 (10:55 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 13 Aug 2021 09:55:59 +0000 (10:55 +0100)
psycopg/psycopg/__init__.py
psycopg/psycopg/connection_async.py
psycopg/psycopg/copy.py
psycopg/psycopg/cursor.py
psycopg/psycopg/cursor_async.py [new file with mode: 0644]
psycopg/psycopg/rows.py
psycopg/psycopg/server_cursor.py

index 22d091b3c208058b0a7e269523f1cdfce8150db4..18894ccb3baf240e135b2b3e8f5ed3fb88f62fed 100644 (file)
@@ -11,7 +11,7 @@ from . import types
 from . import postgres
 from .copy import Copy, AsyncCopy
 from ._enums import IsolationLevel
-from .cursor import AsyncCursor, Cursor
+from .cursor import Cursor
 from .errors import Warning, Error, InterfaceError, DatabaseError
 from .errors import DataError, OperationalError, IntegrityError
 from .errors import InternalError, ProgrammingError, NotSupportedError
@@ -19,6 +19,7 @@ from ._column import Column
 from .conninfo import ConnectionInfo
 from .connection import BaseConnection, Connection, Notify
 from .transaction import Rollback, Transaction, AsyncTransaction
+from .cursor_async import AsyncCursor
 from .server_cursor import AsyncServerCursor, ServerCursor
 from .connection_async import AsyncConnection
 
index 538b660be5001d921dc58884bf66746e7fd885d5..94ba251f4b0dccf9b8224745042745002703b5ff 100644 (file)
@@ -19,11 +19,11 @@ from .abc import Params, PQGen, PQGenConn, Query, RV
 from .rows import Row, AsyncRowFactory, tuple_row, TupleRow
 from ._enums import IsolationLevel
 from .compat import asynccontextmanager
-from .cursor import AsyncCursor
 from .conninfo import _conninfo_connect_timeout
 from .connection import BaseConnection, CursorRow, Notify
 from .generators import notifies
 from .transaction import AsyncTransaction
+from .cursor_async import AsyncCursor
 from .server_cursor import AsyncServerCursor
 
 if TYPE_CHECKING:
index bc7dde18a492a85a750043c87a1f5f98a2a97ad9..1d52451dfe00e43364e405d2293362a5d6941d6a 100644 (file)
@@ -25,7 +25,8 @@ from .generators import copy_from, copy_to, copy_end
 
 if TYPE_CHECKING:
     from .pq.abc import PGresult
-    from .cursor import BaseCursor, Cursor, AsyncCursor
+    from .cursor import BaseCursor, Cursor
+    from .cursor_async import AsyncCursor
     from .connection import Connection  # noqa: F401
     from .connection_async import AsyncConnection  # noqa: F401
 
index 883961afcd08dbc6469757dc39caeb8b9c45ee8e..17807790ccff71e1c36b87f12d6da46612eff3bb 100644 (file)
@@ -6,7 +6,7 @@ psycopg cursor objects
 
 import sys
 from types import TracebackType
-from typing import Any, AsyncIterator, Callable, Generic, Iterator, List
+from typing import Any, Callable, Generic, Iterator, List
 from typing import Optional, NoReturn, Sequence, Type, TYPE_CHECKING, TypeVar
 from contextlib import contextmanager
 
@@ -17,9 +17,8 @@ from . import generators
 
 from .pq import ExecStatus, Format
 from .abc import ConnectionType, Query, Params, PQGen
-from .copy import Copy, AsyncCopy
-from .rows import Row, RowMaker, RowFactory, AsyncRowFactory
-from .compat import asynccontextmanager
+from .copy import Copy
+from .rows import Row, RowMaker, RowFactory
 from ._column import Column
 from ._cmodule import _psycopg
 from ._queries import PostgresQuery
@@ -29,7 +28,6 @@ if TYPE_CHECKING:
     from .abc import Transformer
     from .pq.abc import PGconn, PGresult
     from .connection import Connection
-    from .connection_async import AsyncConnection
 
 execute: Callable[["PGconn"], PQGen[List["PGresult"]]]
 
@@ -633,135 +631,3 @@ class Cursor(BaseCursor["Connection[Any]", Row]):
 
         with Copy(self) as copy:
             yield copy
-
-
-class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
-    __module__ = "psycopg"
-    __slots__ = ()
-
-    def __init__(
-        self,
-        connection: "AsyncConnection[Any]",
-        *,
-        row_factory: AsyncRowFactory[Row],
-    ):
-        super().__init__(connection)
-        self._row_factory = row_factory
-
-    async def __aenter__(self: AnyCursor) -> AnyCursor:
-        return self
-
-    async def __aexit__(
-        self,
-        exc_type: Optional[Type[BaseException]],
-        exc_val: Optional[BaseException],
-        exc_tb: Optional[TracebackType],
-    ) -> None:
-        await self.close()
-
-    async def close(self) -> None:
-        self._close()
-
-    @property
-    def row_factory(self) -> AsyncRowFactory[Row]:
-        return self._row_factory
-
-    @row_factory.setter
-    def row_factory(self, row_factory: AsyncRowFactory[Row]) -> None:
-        self._row_factory = row_factory
-        if self.pgresult:
-            self._make_row = row_factory(self)
-
-    def _make_row_maker(self) -> RowMaker[Row]:
-        return self._row_factory(self)
-
-    async def execute(
-        self: AnyCursor,
-        query: Query,
-        params: Optional[Params] = None,
-        *,
-        prepare: Optional[bool] = None,
-    ) -> AnyCursor:
-        try:
-            async with self._conn.lock:
-                await self._conn.wait(
-                    self._execute_gen(query, params, prepare=prepare)
-                )
-        except e.Error as ex:
-            raise ex.with_traceback(None)
-        return self
-
-    async def executemany(
-        self, query: Query, params_seq: Sequence[Params]
-    ) -> None:
-        async with self._conn.lock:
-            await self._conn.wait(self._executemany_gen(query, params_seq))
-
-    async def stream(
-        self, query: Query, params: Optional[Params] = None
-    ) -> AsyncIterator[Row]:
-        async with self._conn.lock:
-            await self._conn.wait(self._stream_send_gen(query, params))
-            first = True
-            while await self._conn.wait(self._stream_fetchone_gen(first)):
-                rec = self._tx.load_row(0, self._make_row)
-                assert rec is not None
-                yield rec
-                first = False
-
-    async def fetchone(self) -> Optional[Row]:
-        self._check_result()
-        rv = self._tx.load_row(self._pos, self._make_row)
-        if rv is not None:
-            self._pos += 1
-        return rv
-
-    async def fetchmany(self, size: int = 0) -> List[Row]:
-        self._check_result()
-        assert self.pgresult
-
-        if not size:
-            size = self.arraysize
-        records = self._tx.load_rows(
-            self._pos,
-            min(self._pos + size, self.pgresult.ntuples),
-            self._make_row,
-        )
-        self._pos += len(records)
-        return records
-
-    async def fetchall(self) -> List[Row]:
-        self._check_result()
-        assert self.pgresult
-        records = self._tx.load_rows(
-            self._pos, self.pgresult.ntuples, self._make_row
-        )
-        self._pos = self.pgresult.ntuples
-        return records
-
-    async def __aiter__(self) -> AsyncIterator[Row]:
-        self._check_result()
-
-        def load(pos: int) -> Optional[Row]:
-            return self._tx.load_row(pos, self._make_row)
-
-        while 1:
-            row = load(self._pos)
-            if row is None:
-                break
-            self._pos += 1
-            yield row
-
-    async def scroll(self, value: int, mode: str = "relative") -> None:
-        self._scroll(value, mode)
-
-    @asynccontextmanager
-    async def copy(self, statement: Query) -> AsyncIterator[AsyncCopy]:
-        """
-        :rtype: AsyncCopy
-        """
-        async with self._conn.lock:
-            await self._conn.wait(self._start_copy_gen(statement))
-
-        async with AsyncCopy(self) as copy:
-            yield copy
diff --git a/psycopg/psycopg/cursor_async.py b/psycopg/psycopg/cursor_async.py
new file mode 100644 (file)
index 0000000..36a6f15
--- /dev/null
@@ -0,0 +1,152 @@
+"""
+psycopg async cursor objects
+"""
+
+# Copyright (C) 2020-2021 The Psycopg Team
+
+from types import TracebackType
+from typing import Any, AsyncIterator, List
+from typing import Optional, Sequence, Type, TYPE_CHECKING
+
+from . import errors as e
+
+from .abc import Query, Params
+from .copy import AsyncCopy
+from .rows import Row, RowMaker, AsyncRowFactory
+from .compat import asynccontextmanager
+from .cursor import BaseCursor, AnyCursor
+
+if TYPE_CHECKING:
+    from .connection_async import AsyncConnection
+
+
+class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
+    __module__ = "psycopg"
+    __slots__ = ()
+
+    def __init__(
+        self,
+        connection: "AsyncConnection[Any]",
+        *,
+        row_factory: AsyncRowFactory[Row],
+    ):
+        super().__init__(connection)
+        self._row_factory = row_factory
+
+    async def __aenter__(self: AnyCursor) -> AnyCursor:
+        return self
+
+    async def __aexit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exc_val: Optional[BaseException],
+        exc_tb: Optional[TracebackType],
+    ) -> None:
+        await self.close()
+
+    async def close(self) -> None:
+        self._close()
+
+    @property
+    def row_factory(self) -> AsyncRowFactory[Row]:
+        return self._row_factory
+
+    @row_factory.setter
+    def row_factory(self, row_factory: AsyncRowFactory[Row]) -> None:
+        self._row_factory = row_factory
+        if self.pgresult:
+            self._make_row = row_factory(self)
+
+    def _make_row_maker(self) -> RowMaker[Row]:
+        return self._row_factory(self)
+
+    async def execute(
+        self: AnyCursor,
+        query: Query,
+        params: Optional[Params] = None,
+        *,
+        prepare: Optional[bool] = None,
+    ) -> AnyCursor:
+        try:
+            async with self._conn.lock:
+                await self._conn.wait(
+                    self._execute_gen(query, params, prepare=prepare)
+                )
+        except e.Error as ex:
+            raise ex.with_traceback(None)
+        return self
+
+    async def executemany(
+        self, query: Query, params_seq: Sequence[Params]
+    ) -> None:
+        async with self._conn.lock:
+            await self._conn.wait(self._executemany_gen(query, params_seq))
+
+    async def stream(
+        self, query: Query, params: Optional[Params] = None
+    ) -> AsyncIterator[Row]:
+        async with self._conn.lock:
+            await self._conn.wait(self._stream_send_gen(query, params))
+            first = True
+            while await self._conn.wait(self._stream_fetchone_gen(first)):
+                rec = self._tx.load_row(0, self._make_row)
+                assert rec is not None
+                yield rec
+                first = False
+
+    async def fetchone(self) -> Optional[Row]:
+        self._check_result()
+        rv = self._tx.load_row(self._pos, self._make_row)
+        if rv is not None:
+            self._pos += 1
+        return rv
+
+    async def fetchmany(self, size: int = 0) -> List[Row]:
+        self._check_result()
+        assert self.pgresult
+
+        if not size:
+            size = self.arraysize
+        records = self._tx.load_rows(
+            self._pos,
+            min(self._pos + size, self.pgresult.ntuples),
+            self._make_row,
+        )
+        self._pos += len(records)
+        return records
+
+    async def fetchall(self) -> List[Row]:
+        self._check_result()
+        assert self.pgresult
+        records = self._tx.load_rows(
+            self._pos, self.pgresult.ntuples, self._make_row
+        )
+        self._pos = self.pgresult.ntuples
+        return records
+
+    async def __aiter__(self) -> AsyncIterator[Row]:
+        self._check_result()
+
+        def load(pos: int) -> Optional[Row]:
+            return self._tx.load_row(pos, self._make_row)
+
+        while 1:
+            row = load(self._pos)
+            if row is None:
+                break
+            self._pos += 1
+            yield row
+
+    async def scroll(self, value: int, mode: str = "relative") -> None:
+        self._scroll(value, mode)
+
+    @asynccontextmanager
+    async def copy(self, statement: Query) -> AsyncIterator[AsyncCopy]:
+        """
+        :rtype: AsyncCopy
+        """
+        async with self._conn.lock:
+            await self._conn.wait(self._start_copy_gen(statement))
+
+        async with AsyncCopy(self) as copy:
+            yield copy
index f181e87870d06d805810ffefd398a2e89d992a7a..927c2919f7ae63a3193022696ea371130412b7df 100644 (file)
@@ -14,7 +14,8 @@ from . import errors as e
 from .compat import Protocol
 
 if TYPE_CHECKING:
-    from .cursor import BaseCursor, Cursor, AsyncCursor
+    from .cursor import BaseCursor, Cursor
+    from .cursor_async import AsyncCursor
 
 T = TypeVar("T")
 
index 238a1685b310e64e8dad932f918580a95b8f413c..c8d956b6a410a81a7999a620696b5fa2a562e3f7 100644 (file)
@@ -13,7 +13,8 @@ from . import sql
 from . import errors as e
 from .abc import ConnectionType, Query, Params, PQGen
 from .rows import Row, RowFactory, AsyncRowFactory
-from .cursor import AnyCursor, BaseCursor, Cursor, AsyncCursor, execute
+from .cursor import AnyCursor, BaseCursor, Cursor, execute
+from .cursor_async import AsyncCursor
 
 if TYPE_CHECKING:
     from .connection import Connection