From d640192877e4d1da75e8dea34d2374c404e80538 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Wed, 1 Sep 2021 08:58:06 -0400 Subject: [PATCH] add asyncio.gather() example; add connection opts while I dont like this approach very much, people will likely be asking for it a lot, so represent the most correct and efficient form we can handle right now. Added missing ``**kw`` arguments to the :meth:`_asyncio.AsyncSession.connection` method. Change-Id: Idadae2a02a4d96ecb96a5723ce64d017ab4c6217 References: https://github.com/sqlalchemy/sqlalchemy/discussions/6965 --- .../changelog/unreleased_14/async_conn.rst | 5 + examples/asyncio/gather_orm_statements.py | 118 ++++++++++++++++++ lib/sqlalchemy/ext/asyncio/session.py | 9 +- test/ext/asyncio/test_session_py3k.py | 11 ++ 4 files changed, 141 insertions(+), 2 deletions(-) create mode 100644 doc/build/changelog/unreleased_14/async_conn.rst create mode 100644 examples/asyncio/gather_orm_statements.py diff --git a/doc/build/changelog/unreleased_14/async_conn.rst b/doc/build/changelog/unreleased_14/async_conn.rst new file mode 100644 index 0000000000..7acb147ad2 --- /dev/null +++ b/doc/build/changelog/unreleased_14/async_conn.rst @@ -0,0 +1,5 @@ +.. change:: + :tags: bug, asyncio + + Added missing ``**kw`` arguments to the + :meth:`_asyncio.AsyncSession.connection` method. diff --git a/examples/asyncio/gather_orm_statements.py b/examples/asyncio/gather_orm_statements.py new file mode 100644 index 0000000000..edcdc1fe84 --- /dev/null +++ b/examples/asyncio/gather_orm_statements.py @@ -0,0 +1,118 @@ +""" +Illustrates how to run many statements concurrently using ``asyncio.gather()`` +along many asyncio database connections, merging ORM results into a single +``AsyncSession``. + +Note that this pattern loses all transactional safety and is also not +necessarily any more performant than using a single Session, as it adds +significant CPU-bound work both to maintain more database connections +and sessions, as well as within the merging of results from external sessions +into one. + +Python is a CPU-intensive language even in trivial cases, so it is strongly +recommended that any workarounds for "speed" such as the one below are +carefully vetted to show that they do in fact improve performance vs a +traditional approach. + +""" + +import asyncio +import random + +from sqlalchemy import Column +from sqlalchemy import Integer +from sqlalchemy import String +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.future import select +from sqlalchemy.orm import merge_frozen_result +from sqlalchemy.orm import sessionmaker + +Base = declarative_base() + + +class A(Base): + __tablename__ = "a" + + id = Column(Integer, primary_key=True) + data = Column(String) + + +async def run_out_of_band( + sessionmaker, session, statement, merge_results=True +): + """run an ORM statement in a distinct session, merging the result + back into the given session. + + """ + + async with sessionmaker() as oob_session: + + # use AUTOCOMMIT for each connection to reduce transaction + # overhead / contention + await oob_session.connection( + execution_options={"isolation_level": "AUTOCOMMIT"} + ) + + # pre 1.4.24 + # await oob_session.run_sync( + # lambda sync_session: sync_session.connection( + # execution_options={"isolation_level": "AUTOCOMMIT"} + # ) + # ) + + result = await oob_session.execute(statement) + + if merge_results: + # merge_results means the ORM objects from the result + # will be merged back into the original session. + # load=False means we can use the objects directly without + # re-selecting them. however this merge operation is still + # more expensive CPU-wise than a regular ORM load because the + # objects are copied into new instances + return ( + await session.run_sync( + merge_frozen_result, + statement, + result.freeze(), + load=False, + ) + )() + else: + await result.close() + + +async def async_main(): + + engine = create_async_engine( + "postgresql+asyncpg://scott:tiger@localhost/test", + echo=True, + ) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await conn.run_sync(Base.metadata.create_all) + + async_session = sessionmaker( + engine, expire_on_commit=False, class_=AsyncSession + ) + + async with async_session() as session, session.begin(): + session.add_all([A(data="a_%d" % i) for i in range(100)]) + + statements = [ + select(A).where(A.data == "a_%d" % random.choice(range(100))) + for i in range(30) + ] + + results = await asyncio.gather( + *( + run_out_of_band(async_session, session, statement) + for statement in statements + ) + ) + print(f"results: {[r.all() for r in results]}") + + +asyncio.run(async_main()) diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py index a62c7177cc..6b18e3d7c7 100644 --- a/lib/sqlalchemy/ext/asyncio/session.py +++ b/lib/sqlalchemy/ext/asyncio/session.py @@ -330,13 +330,18 @@ class AsyncSession(ReversibleProxy): else: return None - async def connection(self): + async def connection(self, **kw): r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to this :class:`.Session` object's transactional state. + .. versionadded:: 1.4.24 Added **kw arguments which are passed through + to the underlying :meth:`_orm.Session.connection` method. + """ - sync_connection = await greenlet_spawn(self.sync_session.connection) + sync_connection = await greenlet_spawn( + self.sync_session.connection, **kw + ) return engine.AsyncConnection._retrieve_proxy_for_target( sync_connection ) diff --git a/test/ext/asyncio/test_session_py3k.py b/test/ext/asyncio/test_session_py3k.py index 459d95ea6a..4165991d41 100644 --- a/test/ext/asyncio/test_session_py3k.py +++ b/test/ext/asyncio/test_session_py3k.py @@ -614,6 +614,17 @@ class AsyncProxyTest(AsyncFixture): is_(c1, c2) is_(c1.engine, c2.engine) + @async_test + async def test_get_connection_kws(self, async_session): + c1 = await async_session.connection( + execution_options={"isolation_level": "AUTOCOMMIT"} + ) + + eq_( + c1.sync_connection._execution_options, + {"isolation_level": "AUTOCOMMIT"}, + ) + @async_test async def test_get_connection_connection_bound(self, async_engine): async with async_engine.begin() as conn: -- 2.47.2