import asyncio
import random
-from sqlalchemy import Column
-from sqlalchemy import Integer
-from sqlalchemy import String
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine
-from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.future import select
+from sqlalchemy.orm import DeclarativeBase
+from sqlalchemy.orm import Mapped
+from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import merge_frozen_result
-Base = declarative_base()
+
+class Base(DeclarativeBase):
+ pass
class A(Base):
__tablename__ = "a"
- id = Column(Integer, primary_key=True)
- data = Column(String)
+ id: Mapped[int] = mapped_column(primary_key=True)
+ data: Mapped[str]
+ def __repr__(self):
+ id_, data = self.id, self.data
+ return f"A({id_=}, {data=})"
-async def run_out_of_band(
- async_sessionmaker, session, statement, merge_results=True
-):
- """run an ORM statement in a distinct session, merging the result
- back into the given session.
+async def run_out_of_band(async_sessionmaker, statement, merge_results=True):
+ """run an ORM statement in a distinct session,
+ returning the frozen results
"""
async with async_sessionmaker() as oob_session:
execution_options={"isolation_level": "AUTOCOMMIT"}
)
- # pre 1.4.24
- # await oob_session.run_sync(
- # lambda sync_session: sync_session.connection(
- # execution_options={"isolation_level": "AUTOCOMMIT"}
- # )
- # )
-
result = await oob_session.execute(statement)
if merge_results:
- # merge_results means the ORM objects from the result
- # will be merged back into the original session.
- # load=False means we can use the objects directly without
- # re-selecting them. however this merge operation is still
- # more expensive CPU-wise than a regular ORM load because the
- # objects are copied into new instances
- return (
- await session.run_sync(
- merge_frozen_result,
- statement,
- result.freeze(),
- load=False,
- )
- )()
+ return result.freeze()
else:
await result.close()
for i in range(30)
]
- results = await asyncio.gather(
+ frozen_results = await asyncio.gather(
*(
- run_out_of_band(async_session, session, statement)
+ run_out_of_band(async_session, statement)
for statement in statements
)
)
+ results = [
+ # merge_results means the ORM objects from the result
+ # will be merged back into the original session.
+ # load=False means we can use the objects directly without
+ # re-selecting them. however this merge operation is still
+ # more expensive CPU-wise than a regular ORM load because the
+ # objects are copied into new instances
+ (
+ await session.run_sync(
+ merge_frozen_result, statement, result, load=False
+ )
+ )()
+ for statement, result in zip(statements, frozen_results)
+ ]
+
print(f"results: {[r.all() for r in results]}")