From 5889410be77c19c6cc1a033bb7940aba6719c5a8 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Fri, 13 Aug 2021 10:55:59 +0100 Subject: [PATCH] Move the AsyncCursor implementation in its own module --- psycopg/psycopg/__init__.py | 3 +- psycopg/psycopg/connection_async.py | 2 +- psycopg/psycopg/copy.py | 3 +- psycopg/psycopg/cursor.py | 140 +------------------------ psycopg/psycopg/cursor_async.py | 152 ++++++++++++++++++++++++++++ psycopg/psycopg/rows.py | 3 +- psycopg/psycopg/server_cursor.py | 3 +- 7 files changed, 164 insertions(+), 142 deletions(-) create mode 100644 psycopg/psycopg/cursor_async.py diff --git a/psycopg/psycopg/__init__.py b/psycopg/psycopg/__init__.py index 22d091b3c..18894ccb3 100644 --- a/psycopg/psycopg/__init__.py +++ b/psycopg/psycopg/__init__.py @@ -11,7 +11,7 @@ from . import types from . import postgres from .copy import Copy, AsyncCopy from ._enums import IsolationLevel -from .cursor import AsyncCursor, Cursor +from .cursor import Cursor from .errors import Warning, Error, InterfaceError, DatabaseError from .errors import DataError, OperationalError, IntegrityError from .errors import InternalError, ProgrammingError, NotSupportedError @@ -19,6 +19,7 @@ from ._column import Column from .conninfo import ConnectionInfo from .connection import BaseConnection, Connection, Notify from .transaction import Rollback, Transaction, AsyncTransaction +from .cursor_async import AsyncCursor from .server_cursor import AsyncServerCursor, ServerCursor from .connection_async import AsyncConnection diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index 538b660be..94ba251f4 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -19,11 +19,11 @@ from .abc import Params, PQGen, PQGenConn, Query, RV from .rows import Row, AsyncRowFactory, tuple_row, TupleRow from ._enums import IsolationLevel from .compat import asynccontextmanager -from .cursor import AsyncCursor from .conninfo import _conninfo_connect_timeout from .connection import BaseConnection, CursorRow, Notify from .generators import notifies from .transaction import AsyncTransaction +from .cursor_async import AsyncCursor from .server_cursor import AsyncServerCursor if TYPE_CHECKING: diff --git a/psycopg/psycopg/copy.py b/psycopg/psycopg/copy.py index bc7dde18a..1d52451df 100644 --- a/psycopg/psycopg/copy.py +++ b/psycopg/psycopg/copy.py @@ -25,7 +25,8 @@ from .generators import copy_from, copy_to, copy_end if TYPE_CHECKING: from .pq.abc import PGresult - from .cursor import BaseCursor, Cursor, AsyncCursor + from .cursor import BaseCursor, Cursor + from .cursor_async import AsyncCursor from .connection import Connection # noqa: F401 from .connection_async import AsyncConnection # noqa: F401 diff --git a/psycopg/psycopg/cursor.py b/psycopg/psycopg/cursor.py index 883961afc..17807790c 100644 --- a/psycopg/psycopg/cursor.py +++ b/psycopg/psycopg/cursor.py @@ -6,7 +6,7 @@ psycopg cursor objects import sys from types import TracebackType -from typing import Any, AsyncIterator, Callable, Generic, Iterator, List +from typing import Any, Callable, Generic, Iterator, List from typing import Optional, NoReturn, Sequence, Type, TYPE_CHECKING, TypeVar from contextlib import contextmanager @@ -17,9 +17,8 @@ from . import generators from .pq import ExecStatus, Format from .abc import ConnectionType, Query, Params, PQGen -from .copy import Copy, AsyncCopy -from .rows import Row, RowMaker, RowFactory, AsyncRowFactory -from .compat import asynccontextmanager +from .copy import Copy +from .rows import Row, RowMaker, RowFactory from ._column import Column from ._cmodule import _psycopg from ._queries import PostgresQuery @@ -29,7 +28,6 @@ if TYPE_CHECKING: from .abc import Transformer from .pq.abc import PGconn, PGresult from .connection import Connection - from .connection_async import AsyncConnection execute: Callable[["PGconn"], PQGen[List["PGresult"]]] @@ -633,135 +631,3 @@ class Cursor(BaseCursor["Connection[Any]", Row]): with Copy(self) as copy: yield copy - - -class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]): - __module__ = "psycopg" - __slots__ = () - - def __init__( - self, - connection: "AsyncConnection[Any]", - *, - row_factory: AsyncRowFactory[Row], - ): - super().__init__(connection) - self._row_factory = row_factory - - async def __aenter__(self: AnyCursor) -> AnyCursor: - return self - - async def __aexit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - await self.close() - - async def close(self) -> None: - self._close() - - @property - def row_factory(self) -> AsyncRowFactory[Row]: - return self._row_factory - - @row_factory.setter - def row_factory(self, row_factory: AsyncRowFactory[Row]) -> None: - self._row_factory = row_factory - if self.pgresult: - self._make_row = row_factory(self) - - def _make_row_maker(self) -> RowMaker[Row]: - return self._row_factory(self) - - async def execute( - self: AnyCursor, - query: Query, - params: Optional[Params] = None, - *, - prepare: Optional[bool] = None, - ) -> AnyCursor: - try: - async with self._conn.lock: - await self._conn.wait( - self._execute_gen(query, params, prepare=prepare) - ) - except e.Error as ex: - raise ex.with_traceback(None) - return self - - async def executemany( - self, query: Query, params_seq: Sequence[Params] - ) -> None: - async with self._conn.lock: - await self._conn.wait(self._executemany_gen(query, params_seq)) - - async def stream( - self, query: Query, params: Optional[Params] = None - ) -> AsyncIterator[Row]: - async with self._conn.lock: - await self._conn.wait(self._stream_send_gen(query, params)) - first = True - while await self._conn.wait(self._stream_fetchone_gen(first)): - rec = self._tx.load_row(0, self._make_row) - assert rec is not None - yield rec - first = False - - async def fetchone(self) -> Optional[Row]: - self._check_result() - rv = self._tx.load_row(self._pos, self._make_row) - if rv is not None: - self._pos += 1 - return rv - - async def fetchmany(self, size: int = 0) -> List[Row]: - self._check_result() - assert self.pgresult - - if not size: - size = self.arraysize - records = self._tx.load_rows( - self._pos, - min(self._pos + size, self.pgresult.ntuples), - self._make_row, - ) - self._pos += len(records) - return records - - async def fetchall(self) -> List[Row]: - self._check_result() - assert self.pgresult - records = self._tx.load_rows( - self._pos, self.pgresult.ntuples, self._make_row - ) - self._pos = self.pgresult.ntuples - return records - - async def __aiter__(self) -> AsyncIterator[Row]: - self._check_result() - - def load(pos: int) -> Optional[Row]: - return self._tx.load_row(pos, self._make_row) - - while 1: - row = load(self._pos) - if row is None: - break - self._pos += 1 - yield row - - async def scroll(self, value: int, mode: str = "relative") -> None: - self._scroll(value, mode) - - @asynccontextmanager - async def copy(self, statement: Query) -> AsyncIterator[AsyncCopy]: - """ - :rtype: AsyncCopy - """ - async with self._conn.lock: - await self._conn.wait(self._start_copy_gen(statement)) - - async with AsyncCopy(self) as copy: - yield copy diff --git a/psycopg/psycopg/cursor_async.py b/psycopg/psycopg/cursor_async.py new file mode 100644 index 000000000..36a6f152a --- /dev/null +++ b/psycopg/psycopg/cursor_async.py @@ -0,0 +1,152 @@ +""" +psycopg async cursor objects +""" + +# Copyright (C) 2020-2021 The Psycopg Team + +from types import TracebackType +from typing import Any, AsyncIterator, List +from typing import Optional, Sequence, Type, TYPE_CHECKING + +from . import errors as e + +from .abc import Query, Params +from .copy import AsyncCopy +from .rows import Row, RowMaker, AsyncRowFactory +from .compat import asynccontextmanager +from .cursor import BaseCursor, AnyCursor + +if TYPE_CHECKING: + from .connection_async import AsyncConnection + + +class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]): + __module__ = "psycopg" + __slots__ = () + + def __init__( + self, + connection: "AsyncConnection[Any]", + *, + row_factory: AsyncRowFactory[Row], + ): + super().__init__(connection) + self._row_factory = row_factory + + async def __aenter__(self: AnyCursor) -> AnyCursor: + return self + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + await self.close() + + async def close(self) -> None: + self._close() + + @property + def row_factory(self) -> AsyncRowFactory[Row]: + return self._row_factory + + @row_factory.setter + def row_factory(self, row_factory: AsyncRowFactory[Row]) -> None: + self._row_factory = row_factory + if self.pgresult: + self._make_row = row_factory(self) + + def _make_row_maker(self) -> RowMaker[Row]: + return self._row_factory(self) + + async def execute( + self: AnyCursor, + query: Query, + params: Optional[Params] = None, + *, + prepare: Optional[bool] = None, + ) -> AnyCursor: + try: + async with self._conn.lock: + await self._conn.wait( + self._execute_gen(query, params, prepare=prepare) + ) + except e.Error as ex: + raise ex.with_traceback(None) + return self + + async def executemany( + self, query: Query, params_seq: Sequence[Params] + ) -> None: + async with self._conn.lock: + await self._conn.wait(self._executemany_gen(query, params_seq)) + + async def stream( + self, query: Query, params: Optional[Params] = None + ) -> AsyncIterator[Row]: + async with self._conn.lock: + await self._conn.wait(self._stream_send_gen(query, params)) + first = True + while await self._conn.wait(self._stream_fetchone_gen(first)): + rec = self._tx.load_row(0, self._make_row) + assert rec is not None + yield rec + first = False + + async def fetchone(self) -> Optional[Row]: + self._check_result() + rv = self._tx.load_row(self._pos, self._make_row) + if rv is not None: + self._pos += 1 + return rv + + async def fetchmany(self, size: int = 0) -> List[Row]: + self._check_result() + assert self.pgresult + + if not size: + size = self.arraysize + records = self._tx.load_rows( + self._pos, + min(self._pos + size, self.pgresult.ntuples), + self._make_row, + ) + self._pos += len(records) + return records + + async def fetchall(self) -> List[Row]: + self._check_result() + assert self.pgresult + records = self._tx.load_rows( + self._pos, self.pgresult.ntuples, self._make_row + ) + self._pos = self.pgresult.ntuples + return records + + async def __aiter__(self) -> AsyncIterator[Row]: + self._check_result() + + def load(pos: int) -> Optional[Row]: + return self._tx.load_row(pos, self._make_row) + + while 1: + row = load(self._pos) + if row is None: + break + self._pos += 1 + yield row + + async def scroll(self, value: int, mode: str = "relative") -> None: + self._scroll(value, mode) + + @asynccontextmanager + async def copy(self, statement: Query) -> AsyncIterator[AsyncCopy]: + """ + :rtype: AsyncCopy + """ + async with self._conn.lock: + await self._conn.wait(self._start_copy_gen(statement)) + + async with AsyncCopy(self) as copy: + yield copy diff --git a/psycopg/psycopg/rows.py b/psycopg/psycopg/rows.py index f181e8787..927c2919f 100644 --- a/psycopg/psycopg/rows.py +++ b/psycopg/psycopg/rows.py @@ -14,7 +14,8 @@ from . import errors as e from .compat import Protocol if TYPE_CHECKING: - from .cursor import BaseCursor, Cursor, AsyncCursor + from .cursor import BaseCursor, Cursor + from .cursor_async import AsyncCursor T = TypeVar("T") diff --git a/psycopg/psycopg/server_cursor.py b/psycopg/psycopg/server_cursor.py index 238a1685b..c8d956b6a 100644 --- a/psycopg/psycopg/server_cursor.py +++ b/psycopg/psycopg/server_cursor.py @@ -13,7 +13,8 @@ from . import sql from . import errors as e from .abc import ConnectionType, Query, Params, PQGen from .rows import Row, RowFactory, AsyncRowFactory -from .cursor import AnyCursor, BaseCursor, Cursor, AsyncCursor, execute +from .cursor import AnyCursor, BaseCursor, Cursor, execute +from .cursor_async import AsyncCursor if TYPE_CHECKING: from .connection import Connection -- 2.47.3