import collections
import decimal
+import functools
import json as _py_json
import re
import time
def connect(self, *arg, **kw):
async_fallback = kw.pop("async_fallback", False)
+ creator_fn = kw.pop("creator_fn", functools.partial(self.asyncpg.connect))
prepared_statement_cache_size = kw.pop(
"prepared_statement_cache_size", 100
)
if util.asbool(async_fallback):
return AsyncAdaptFallback_asyncpg_connection(
self,
- await_fallback(self.asyncpg.connect(*arg, **kw)),
+ await_fallback(creator_fn(*arg, **kw)),
prepared_statement_cache_size=prepared_statement_cache_size,
prepared_statement_name_func=prepared_statement_name_func,
)
else:
return AsyncAdapt_asyncpg_connection(
self,
- await_only(self.asyncpg.connect(*arg, **kw)),
+ await_only(creator_fn(*arg, **kw)),
prepared_statement_cache_size=prepared_statement_cache_size,
prepared_statement_name_func=prepared_statement_name_func,
)
"streaming result set"
)
kw["_is_async"] = True
+ async_creator = kw.pop("async_creator", None)
+ if async_creator:
+ async def wrap_async_creator():
+ return await async_creator()
+
+ def creator():
+ # note that to send adapted arguments like
+ # prepared_statement_cache_size, user would use
+ # "creator" and emulate this form here
+ return sync_engine.dialect.dbapi.connect(
+ creator_fn=wrap_async_creator
+ )
+ kw["creator"] = creator
sync_engine = _create_engine(url, **kw)
return AsyncEngine(sync_engine)