def connect(self, *arg, **kw):
async_fallback = kw.pop("async_fallback", False)
+ creator_fn = kw.pop("creator_fn", self.psycopg.AsyncConnection.connect)
if util.asbool(async_fallback):
return AsyncAdaptFallback_psycopg_connection(
- await_fallback(
- self.psycopg.AsyncConnection.connect(*arg, **kw)
- )
+ await_fallback(creator_fn(*arg, **kw))
)
else:
return AsyncAdapt_psycopg_connection(
- await_only(self.psycopg.AsyncConnection.connect(*arg, **kw))
+ await_only(creator_fn(*arg, **kw))
)
# note that to send adapted arguments like
# prepared_statement_cache_size, user would use
# "creator" and emulate this form here
- return sync_engine.dialect.dbapi.connect(creator_fn=async_creator)
+ return sync_engine.dialect.dbapi.connect( # type: ignore
+ creator_fn=async_creator
+ )
kw["creator"] = creator
sync_engine = _create_engine(url, **kw)
)
from sqlalchemy.engine import url
from sqlalchemy.sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
+from sqlalchemy.testing import async_test
from sqlalchemy.testing import config
from sqlalchemy.testing import engines
from sqlalchemy.testing import expect_raises_message
def test_async_version(self):
e = create_engine("postgresql+psycopg_async://")
is_true(isinstance(e.dialect, psycopg_dialect.PGDialectAsync_psycopg))
+
+
+class AsyncPostgresTest(fixtures.TestBase):
+ __requires__ = ("async_dialect",)
+
+ @testing.only_on("postgresql+psycopg")
+ @async_test
+ async def test_async_creator(self, async_testing_engine):
+ import psycopg
+
+ url = config.db.url.render_as_string(hide_password=False)
+ # format URL properly, strip driver
+ url = url.replace("+psycopg_async", "")
+
+ async def async_creator():
+ conn = await psycopg.AsyncConnection.connect(url)
+ return conn
+
+ engine = async_testing_engine(
+ options={"async_creator": async_creator},
+ )
+ async with engine.connect() as conn:
+ result = await conn.execute(select(1))
+ eq_(result.scalar(), 1)