from __future__ import annotations
from typing import Any
+from typing import Callable
from typing import Dict
from typing import Generator
from typing import NoReturn
from ...engine.base import NestedTransaction
from ...engine.base import Transaction
from ...util.concurrency import greenlet_spawn
-from ...util.typing import Protocol
if TYPE_CHECKING:
from ...engine.cursor import CursorResult
_T = TypeVar("_T", bound=Any)
-class _SyncConnectionCallable(Protocol):
- def __call__(self, connection: Connection, *arg: Any, **kw: Any) -> Any:
- ...
-
-
def create_async_engine(url: Union[str, URL], **kw: Any) -> AsyncEngine:
"""Create a new async engine instance.
return result.scalars()
async def run_sync(
- self, fn: _SyncConnectionCallable, *arg: Any, **kw: Any
+ self, fn: Callable[..., Any], *arg: Any, **kw: Any
) -> Any:
"""Invoke the given sync callable passing self as the first argument.
def __init__(self, conn: AsyncConnection):
self.conn = conn
+ if TYPE_CHECKING:
+
+ async def __aenter__(self) -> AsyncConnection:
+ ...
+
async def start(self, is_ctxmanager: bool = False) -> AsyncConnection:
await self.conn.start(is_ctxmanager=is_ctxmanager)
self.transaction = self.conn.begin()
from __future__ import annotations
from typing import Any
+from typing import Callable
from typing import Dict
from typing import Generic
from typing import Iterable
from ...orm import SessionTransaction
from ...orm import state as _instance_state
from ...util.concurrency import greenlet_spawn
-from ...util.typing import Protocol
if TYPE_CHECKING:
from .engine import AsyncConnection
_T = TypeVar("_T", bound=Any)
-class _SyncSessionCallable(Protocol):
- def __call__(self, session: Session, *arg: Any, **kw: Any) -> Any:
- ...
-
-
_EXECUTE_OPTIONS = util.immutabledict({"prebuffer_rows": True})
_STREAM_OPTIONS = util.immutabledict({"stream_results": True})
)
async def run_sync(
- self, fn: _SyncSessionCallable, *arg: Any, **kw: Any
+ self, fn: Callable[..., Any], *arg: Any, **kw: Any
) -> Any:
"""Invoke the given sync callable passing sync self as the first
argument.
from __future__ import annotations
import asyncio
+from typing import Any
from typing import List
+from typing import Optional
from typing import TYPE_CHECKING
from sqlalchemy import ForeignKey
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
+from sqlalchemy.orm import Session
if TYPE_CHECKING:
from sqlalchemy import ScalarResult
data: Mapped[str]
+def work_with_a_session_one(sess: Session) -> Any:
+ pass
+
+
+def work_with_a_session_two(sess: Session, param: Optional[str] = None) -> Any:
+ pass
+
+
async def async_main() -> None:
"""Main program function."""
async_session = async_sessionmaker(engine, expire_on_commit=False)
async with async_session.begin() as session:
+ await session.run_sync(work_with_a_session_one)
+ await session.run_sync(work_with_a_session_two, param="foo")
+
session.add_all(
[
A(bs=[B(), B()], data="a1"),