-from asyncio import current_task
-
import sqlalchemy as sa
from sqlalchemy import func
from sqlalchemy import select
+from sqlalchemy import testing
from sqlalchemy.ext.asyncio import async_scoped_session
from sqlalchemy.ext.asyncio import AsyncSession as _AsyncSession
from sqlalchemy.testing import async_test
class AsyncScopedSessionTest(AsyncFixture):
+ @testing.requires.python37
@async_test
async def test_basic(self, async_engine):
+ from asyncio import current_task
+
AsyncSession = async_scoped_session(
sa.orm.sessionmaker(async_engine, class_=_AsyncSession),
scopefunc=current_task,