import sys
from types import TracebackType
-from typing import Any, Callable, Generic, Iterator, List
+from typing import Any, Callable, Generic, Iterable, Iterator, List
from typing import Optional, NoReturn, Sequence, Type, TypeVar, TYPE_CHECKING
from contextlib import contextmanager
self._last_query = query
def _executemany_gen(
- self, query: Query, params_seq: Sequence[Params]
+ self, query: Query, params_seq: Iterable[Params]
) -> PQGen[None]:
"""Generator implementing `Cursor.executemany()`."""
yield from self._start_query(query)
raise ex.with_traceback(None)
return self
- def executemany(self, query: Query, params_seq: Sequence[Params]) -> None:
+ def executemany(self, query: Query, params_seq: Iterable[Params]) -> None:
"""
Execute the same command with a sequence of input data.
"""
# Copyright (C) 2020-2021 The Psycopg Team
from types import TracebackType
-from typing import Any, AsyncIterator, List
-from typing import Optional, Sequence, Type, TypeVar, TYPE_CHECKING
+from typing import Any, AsyncIterator, Iterable, List
+from typing import Optional, Type, TypeVar, TYPE_CHECKING
from . import errors as e
return self
async def executemany(
- self, query: Query, params_seq: Sequence[Params]
+ self, query: Query, params_seq: Iterable[Params]
) -> None:
async with self._conn.lock:
await self._conn.wait(self._executemany_gen(query, params_seq))
# Copyright (C) 2020-2021 The Psycopg Team
import warnings
-from typing import Any, AsyncIterator, Generic, List, Iterator, Optional
-from typing import Sequence, TypeVar, TYPE_CHECKING
+from typing import Any, AsyncIterator, Generic, List, Iterable, Iterator
+from typing import Optional, TypeVar, TYPE_CHECKING
from . import pq
from . import sql
return self
- def executemany(self, query: Query, params_seq: Sequence[Params]) -> None:
+ def executemany(self, query: Query, params_seq: Iterable[Params]) -> None:
"""Method not implemented for server-side cursors."""
raise e.NotSupportedError(
"executemany not supported on server-side cursors"
return self
async def executemany(
- self, query: Query, params_seq: Sequence[Params]
+ self, query: Query, params_seq: Iterable[Params]
) -> None:
raise e.NotSupportedError(
"executemany not supported on server-side cursors"