]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
add asyncio.gather() example; add connection opts
authorMike Bayer <mike_mp@zzzcomputing.com>
Wed, 1 Sep 2021 12:58:06 +0000 (08:58 -0400)
committermike bayer <mike_mp@zzzcomputing.com>
Thu, 2 Sep 2021 14:19:20 +0000 (14:19 +0000)
while I dont like this approach very much, people will likely
be asking for it a lot, so represent the most correct and
efficient form we can handle right now.

Added missing ``**kw`` arguments to the
:meth:`_asyncio.AsyncSession.connection` method.

Change-Id: Idadae2a02a4d96ecb96a5723ce64d017ab4c6217
References: https://github.com/sqlalchemy/sqlalchemy/discussions/6965

doc/build/changelog/unreleased_14/async_conn.rst [new file with mode: 0644]
examples/asyncio/gather_orm_statements.py [new file with mode: 0644]
lib/sqlalchemy/ext/asyncio/session.py
test/ext/asyncio/test_session_py3k.py

diff --git a/doc/build/changelog/unreleased_14/async_conn.rst b/doc/build/changelog/unreleased_14/async_conn.rst
new file mode 100644 (file)
index 0000000..7acb147
--- /dev/null
@@ -0,0 +1,5 @@
+.. change::
+    :tags: bug, asyncio
+
+    Added missing ``**kw`` arguments to the
+    :meth:`_asyncio.AsyncSession.connection` method.
diff --git a/examples/asyncio/gather_orm_statements.py b/examples/asyncio/gather_orm_statements.py
new file mode 100644 (file)
index 0000000..edcdc1f
--- /dev/null
@@ -0,0 +1,118 @@
+"""
+Illustrates how to run many statements concurrently using ``asyncio.gather()``
+along many asyncio database connections, merging ORM results into a single
+``AsyncSession``.
+
+Note that this pattern loses all transactional safety and is also not
+necessarily any more performant than using a single Session, as it adds
+significant CPU-bound work both to maintain more database connections
+and sessions, as well as within the merging of results from external sessions
+into one.
+
+Python is a CPU-intensive language even in trivial cases, so it is strongly
+recommended that any workarounds for "speed" such as the one below are
+carefully vetted to show that they do in fact improve performance vs a
+traditional approach.
+
+"""
+
+import asyncio
+import random
+
+from sqlalchemy import Column
+from sqlalchemy import Integer
+from sqlalchemy import String
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.ext.asyncio import create_async_engine
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.future import select
+from sqlalchemy.orm import merge_frozen_result
+from sqlalchemy.orm import sessionmaker
+
+Base = declarative_base()
+
+
+class A(Base):
+    __tablename__ = "a"
+
+    id = Column(Integer, primary_key=True)
+    data = Column(String)
+
+
+async def run_out_of_band(
+    sessionmaker, session, statement, merge_results=True
+):
+    """run an ORM statement in a distinct session, merging the result
+    back into the given session.
+
+    """
+
+    async with sessionmaker() as oob_session:
+
+        # use AUTOCOMMIT for each connection to reduce transaction
+        # overhead / contention
+        await oob_session.connection(
+            execution_options={"isolation_level": "AUTOCOMMIT"}
+        )
+
+        # pre 1.4.24
+        # await oob_session.run_sync(
+        #     lambda sync_session: sync_session.connection(
+        #         execution_options={"isolation_level": "AUTOCOMMIT"}
+        #     )
+        # )
+
+        result = await oob_session.execute(statement)
+
+        if merge_results:
+            # merge_results means the ORM objects from the result
+            # will be merged back into the original session.
+            # load=False means we can use the objects directly without
+            # re-selecting them.  however this merge operation is still
+            # more expensive CPU-wise than a regular ORM load because the
+            # objects are copied into new instances
+            return (
+                await session.run_sync(
+                    merge_frozen_result,
+                    statement,
+                    result.freeze(),
+                    load=False,
+                )
+            )()
+        else:
+            await result.close()
+
+
+async def async_main():
+
+    engine = create_async_engine(
+        "postgresql+asyncpg://scott:tiger@localhost/test",
+        echo=True,
+    )
+
+    async with engine.begin() as conn:
+        await conn.run_sync(Base.metadata.drop_all)
+        await conn.run_sync(Base.metadata.create_all)
+
+    async_session = sessionmaker(
+        engine, expire_on_commit=False, class_=AsyncSession
+    )
+
+    async with async_session() as session, session.begin():
+        session.add_all([A(data="a_%d" % i) for i in range(100)])
+
+    statements = [
+        select(A).where(A.data == "a_%d" % random.choice(range(100)))
+        for i in range(30)
+    ]
+
+    results = await asyncio.gather(
+        *(
+            run_out_of_band(async_session, session, statement)
+            for statement in statements
+        )
+    )
+    print(f"results: {[r.all() for r in results]}")
+
+
+asyncio.run(async_main())
index a62c7177cc47296821878cef2b30a214f772aac1..6b18e3d7c74349df5e8f1267980fe837cafe9d84 100644 (file)
@@ -330,13 +330,18 @@ class AsyncSession(ReversibleProxy):
         else:
             return None
 
-    async def connection(self):
+    async def connection(self, **kw):
         r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to
         this :class:`.Session` object's transactional state.
 
+        .. versionadded:: 1.4.24  Added **kw arguments which are passed through
+           to the underlying :meth:`_orm.Session.connection` method.
+
         """
 
-        sync_connection = await greenlet_spawn(self.sync_session.connection)
+        sync_connection = await greenlet_spawn(
+            self.sync_session.connection, **kw
+        )
         return engine.AsyncConnection._retrieve_proxy_for_target(
             sync_connection
         )
index 459d95ea6af3047f112a73e4e95973ce4c0ee326..4165991d41893404e24b34e1b405029d6b5959ad 100644 (file)
@@ -614,6 +614,17 @@ class AsyncProxyTest(AsyncFixture):
         is_(c1, c2)
         is_(c1.engine, c2.engine)
 
+    @async_test
+    async def test_get_connection_kws(self, async_session):
+        c1 = await async_session.connection(
+            execution_options={"isolation_level": "AUTOCOMMIT"}
+        )
+
+        eq_(
+            c1.sync_connection._execution_options,
+            {"isolation_level": "AUTOCOMMIT"},
+        )
+
     @async_test
     async def test_get_connection_connection_bound(self, async_engine):
         async with async_engine.begin() as conn: