from . import postgres
from .copy import Copy, AsyncCopy
from ._enums import IsolationLevel
-from .cursor import AsyncCursor, Cursor
+from .cursor import Cursor
from .errors import Warning, Error, InterfaceError, DatabaseError
from .errors import DataError, OperationalError, IntegrityError
from .errors import InternalError, ProgrammingError, NotSupportedError
from .conninfo import ConnectionInfo
from .connection import BaseConnection, Connection, Notify
from .transaction import Rollback, Transaction, AsyncTransaction
+from .cursor_async import AsyncCursor
from .server_cursor import AsyncServerCursor, ServerCursor
from .connection_async import AsyncConnection
from .rows import Row, AsyncRowFactory, tuple_row, TupleRow
from ._enums import IsolationLevel
from .compat import asynccontextmanager
-from .cursor import AsyncCursor
from .conninfo import _conninfo_connect_timeout
from .connection import BaseConnection, CursorRow, Notify
from .generators import notifies
from .transaction import AsyncTransaction
+from .cursor_async import AsyncCursor
from .server_cursor import AsyncServerCursor
if TYPE_CHECKING:
if TYPE_CHECKING:
from .pq.abc import PGresult
- from .cursor import BaseCursor, Cursor, AsyncCursor
+ from .cursor import BaseCursor, Cursor
+ from .cursor_async import AsyncCursor
from .connection import Connection # noqa: F401
from .connection_async import AsyncConnection # noqa: F401
import sys
from types import TracebackType
-from typing import Any, AsyncIterator, Callable, Generic, Iterator, List
+from typing import Any, Callable, Generic, Iterator, List
from typing import Optional, NoReturn, Sequence, Type, TYPE_CHECKING, TypeVar
from contextlib import contextmanager
from .pq import ExecStatus, Format
from .abc import ConnectionType, Query, Params, PQGen
-from .copy import Copy, AsyncCopy
-from .rows import Row, RowMaker, RowFactory, AsyncRowFactory
-from .compat import asynccontextmanager
+from .copy import Copy
+from .rows import Row, RowMaker, RowFactory
from ._column import Column
from ._cmodule import _psycopg
from ._queries import PostgresQuery
from .abc import Transformer
from .pq.abc import PGconn, PGresult
from .connection import Connection
- from .connection_async import AsyncConnection
execute: Callable[["PGconn"], PQGen[List["PGresult"]]]
with Copy(self) as copy:
yield copy
-
-
-class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
- __module__ = "psycopg"
- __slots__ = ()
-
- def __init__(
- self,
- connection: "AsyncConnection[Any]",
- *,
- row_factory: AsyncRowFactory[Row],
- ):
- super().__init__(connection)
- self._row_factory = row_factory
-
- async def __aenter__(self: AnyCursor) -> AnyCursor:
- return self
-
- async def __aexit__(
- self,
- exc_type: Optional[Type[BaseException]],
- exc_val: Optional[BaseException],
- exc_tb: Optional[TracebackType],
- ) -> None:
- await self.close()
-
- async def close(self) -> None:
- self._close()
-
- @property
- def row_factory(self) -> AsyncRowFactory[Row]:
- return self._row_factory
-
- @row_factory.setter
- def row_factory(self, row_factory: AsyncRowFactory[Row]) -> None:
- self._row_factory = row_factory
- if self.pgresult:
- self._make_row = row_factory(self)
-
- def _make_row_maker(self) -> RowMaker[Row]:
- return self._row_factory(self)
-
- async def execute(
- self: AnyCursor,
- query: Query,
- params: Optional[Params] = None,
- *,
- prepare: Optional[bool] = None,
- ) -> AnyCursor:
- try:
- async with self._conn.lock:
- await self._conn.wait(
- self._execute_gen(query, params, prepare=prepare)
- )
- except e.Error as ex:
- raise ex.with_traceback(None)
- return self
-
- async def executemany(
- self, query: Query, params_seq: Sequence[Params]
- ) -> None:
- async with self._conn.lock:
- await self._conn.wait(self._executemany_gen(query, params_seq))
-
- async def stream(
- self, query: Query, params: Optional[Params] = None
- ) -> AsyncIterator[Row]:
- async with self._conn.lock:
- await self._conn.wait(self._stream_send_gen(query, params))
- first = True
- while await self._conn.wait(self._stream_fetchone_gen(first)):
- rec = self._tx.load_row(0, self._make_row)
- assert rec is not None
- yield rec
- first = False
-
- async def fetchone(self) -> Optional[Row]:
- self._check_result()
- rv = self._tx.load_row(self._pos, self._make_row)
- if rv is not None:
- self._pos += 1
- return rv
-
- async def fetchmany(self, size: int = 0) -> List[Row]:
- self._check_result()
- assert self.pgresult
-
- if not size:
- size = self.arraysize
- records = self._tx.load_rows(
- self._pos,
- min(self._pos + size, self.pgresult.ntuples),
- self._make_row,
- )
- self._pos += len(records)
- return records
-
- async def fetchall(self) -> List[Row]:
- self._check_result()
- assert self.pgresult
- records = self._tx.load_rows(
- self._pos, self.pgresult.ntuples, self._make_row
- )
- self._pos = self.pgresult.ntuples
- return records
-
- async def __aiter__(self) -> AsyncIterator[Row]:
- self._check_result()
-
- def load(pos: int) -> Optional[Row]:
- return self._tx.load_row(pos, self._make_row)
-
- while 1:
- row = load(self._pos)
- if row is None:
- break
- self._pos += 1
- yield row
-
- async def scroll(self, value: int, mode: str = "relative") -> None:
- self._scroll(value, mode)
-
- @asynccontextmanager
- async def copy(self, statement: Query) -> AsyncIterator[AsyncCopy]:
- """
- :rtype: AsyncCopy
- """
- async with self._conn.lock:
- await self._conn.wait(self._start_copy_gen(statement))
-
- async with AsyncCopy(self) as copy:
- yield copy
--- /dev/null
+"""
+psycopg async cursor objects
+"""
+
+# Copyright (C) 2020-2021 The Psycopg Team
+
+from types import TracebackType
+from typing import Any, AsyncIterator, List
+from typing import Optional, Sequence, Type, TYPE_CHECKING
+
+from . import errors as e
+
+from .abc import Query, Params
+from .copy import AsyncCopy
+from .rows import Row, RowMaker, AsyncRowFactory
+from .compat import asynccontextmanager
+from .cursor import BaseCursor, AnyCursor
+
+if TYPE_CHECKING:
+ from .connection_async import AsyncConnection
+
+
+class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
+ __module__ = "psycopg"
+ __slots__ = ()
+
+ def __init__(
+ self,
+ connection: "AsyncConnection[Any]",
+ *,
+ row_factory: AsyncRowFactory[Row],
+ ):
+ super().__init__(connection)
+ self._row_factory = row_factory
+
+ async def __aenter__(self: AnyCursor) -> AnyCursor:
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ await self.close()
+
+ async def close(self) -> None:
+ self._close()
+
+ @property
+ def row_factory(self) -> AsyncRowFactory[Row]:
+ return self._row_factory
+
+ @row_factory.setter
+ def row_factory(self, row_factory: AsyncRowFactory[Row]) -> None:
+ self._row_factory = row_factory
+ if self.pgresult:
+ self._make_row = row_factory(self)
+
+ def _make_row_maker(self) -> RowMaker[Row]:
+ return self._row_factory(self)
+
+ async def execute(
+ self: AnyCursor,
+ query: Query,
+ params: Optional[Params] = None,
+ *,
+ prepare: Optional[bool] = None,
+ ) -> AnyCursor:
+ try:
+ async with self._conn.lock:
+ await self._conn.wait(
+ self._execute_gen(query, params, prepare=prepare)
+ )
+ except e.Error as ex:
+ raise ex.with_traceback(None)
+ return self
+
+ async def executemany(
+ self, query: Query, params_seq: Sequence[Params]
+ ) -> None:
+ async with self._conn.lock:
+ await self._conn.wait(self._executemany_gen(query, params_seq))
+
+ async def stream(
+ self, query: Query, params: Optional[Params] = None
+ ) -> AsyncIterator[Row]:
+ async with self._conn.lock:
+ await self._conn.wait(self._stream_send_gen(query, params))
+ first = True
+ while await self._conn.wait(self._stream_fetchone_gen(first)):
+ rec = self._tx.load_row(0, self._make_row)
+ assert rec is not None
+ yield rec
+ first = False
+
+ async def fetchone(self) -> Optional[Row]:
+ self._check_result()
+ rv = self._tx.load_row(self._pos, self._make_row)
+ if rv is not None:
+ self._pos += 1
+ return rv
+
+ async def fetchmany(self, size: int = 0) -> List[Row]:
+ self._check_result()
+ assert self.pgresult
+
+ if not size:
+ size = self.arraysize
+ records = self._tx.load_rows(
+ self._pos,
+ min(self._pos + size, self.pgresult.ntuples),
+ self._make_row,
+ )
+ self._pos += len(records)
+ return records
+
+ async def fetchall(self) -> List[Row]:
+ self._check_result()
+ assert self.pgresult
+ records = self._tx.load_rows(
+ self._pos, self.pgresult.ntuples, self._make_row
+ )
+ self._pos = self.pgresult.ntuples
+ return records
+
+ async def __aiter__(self) -> AsyncIterator[Row]:
+ self._check_result()
+
+ def load(pos: int) -> Optional[Row]:
+ return self._tx.load_row(pos, self._make_row)
+
+ while 1:
+ row = load(self._pos)
+ if row is None:
+ break
+ self._pos += 1
+ yield row
+
+ async def scroll(self, value: int, mode: str = "relative") -> None:
+ self._scroll(value, mode)
+
+ @asynccontextmanager
+ async def copy(self, statement: Query) -> AsyncIterator[AsyncCopy]:
+ """
+ :rtype: AsyncCopy
+ """
+ async with self._conn.lock:
+ await self._conn.wait(self._start_copy_gen(statement))
+
+ async with AsyncCopy(self) as copy:
+ yield copy
from .compat import Protocol
if TYPE_CHECKING:
- from .cursor import BaseCursor, Cursor, AsyncCursor
+ from .cursor import BaseCursor, Cursor
+ from .cursor_async import AsyncCursor
T = TypeVar("T")
from . import errors as e
from .abc import ConnectionType, Query, Params, PQGen
from .rows import Row, RowFactory, AsyncRowFactory
-from .cursor import AnyCursor, BaseCursor, Cursor, AsyncCursor, execute
+from .cursor import AnyCursor, BaseCursor, Cursor, execute
+from .cursor_async import AsyncCursor
if TYPE_CHECKING:
from .connection import Connection