From a48ebfe72558ca4cf16ff1ce06e4efd83de48685 Mon Sep 17 00:00:00 2001 From: Federico Caselli Date: Thu, 22 Jun 2023 22:09:23 +0200 Subject: [PATCH] change gather orm example changed gather orm example to avoid passing the same session to the function called in the gather. While this is not an issue since no sql is executed on it, it's still confusing for some users References: #9312 Change-Id: I7f86e9c1640649ac13fd459b761d67718d4c3dc0 --- examples/asyncio/gather_orm_statements.py | 67 +++++++++++------------ 1 file changed, 32 insertions(+), 35 deletions(-) diff --git a/examples/asyncio/gather_orm_statements.py b/examples/asyncio/gather_orm_statements.py index 334521f0e4..b11ee558cd 100644 --- a/examples/asyncio/gather_orm_statements.py +++ b/examples/asyncio/gather_orm_statements.py @@ -19,31 +19,33 @@ traditional approach. import asyncio import random -from sqlalchemy import Column -from sqlalchemy import Integer -from sqlalchemy import String from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.ext.asyncio import create_async_engine -from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.future import select +from sqlalchemy.orm import DeclarativeBase +from sqlalchemy.orm import Mapped +from sqlalchemy.orm import mapped_column from sqlalchemy.orm import merge_frozen_result -Base = declarative_base() + +class Base(DeclarativeBase): + pass class A(Base): __tablename__ = "a" - id = Column(Integer, primary_key=True) - data = Column(String) + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[str] + def __repr__(self): + id_, data = self.id, self.data + return f"A({id_=}, {data=})" -async def run_out_of_band( - async_sessionmaker, session, statement, merge_results=True -): - """run an ORM statement in a distinct session, merging the result - back into the given session. +async def run_out_of_band(async_sessionmaker, statement, merge_results=True): + """run an ORM statement in a distinct session, + returning the frozen results """ async with async_sessionmaker() as oob_session: @@ -53,30 +55,10 @@ async def run_out_of_band( execution_options={"isolation_level": "AUTOCOMMIT"} ) - # pre 1.4.24 - # await oob_session.run_sync( - # lambda sync_session: sync_session.connection( - # execution_options={"isolation_level": "AUTOCOMMIT"} - # ) - # ) - result = await oob_session.execute(statement) if merge_results: - # merge_results means the ORM objects from the result - # will be merged back into the original session. - # load=False means we can use the objects directly without - # re-selecting them. however this merge operation is still - # more expensive CPU-wise than a regular ORM load because the - # objects are copied into new instances - return ( - await session.run_sync( - merge_frozen_result, - statement, - result.freeze(), - load=False, - ) - )() + return result.freeze() else: await result.close() @@ -101,12 +83,27 @@ async def async_main(): for i in range(30) ] - results = await asyncio.gather( + frozen_results = await asyncio.gather( *( - run_out_of_band(async_session, session, statement) + run_out_of_band(async_session, statement) for statement in statements ) ) + results = [ + # merge_results means the ORM objects from the result + # will be merged back into the original session. + # load=False means we can use the objects directly without + # re-selecting them. however this merge operation is still + # more expensive CPU-wise than a regular ORM load because the + # objects are copied into new instances + ( + await session.run_sync( + merge_frozen_result, statement, result, load=False + ) + )() + for statement, result in zip(statements, frozen_results) + ] + print(f"results: {[r.all() for r in results]}") -- 2.47.3