from ... import pool
from ...connectors.asyncio import AsyncAdapt_dbapi_connection
from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
+from ...connectors.asyncio import AsyncAdapt_dbapi_module
from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
from ...util.concurrency import await_
-from ...connectors.asyncio import AsyncAdapt_dbapi_module
if TYPE_CHECKING:
from ...connectors.asyncio import AsyncIODBAPIConnection
connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
cursor: Optional[DBAPICursor],
) -> bool:
- self.dbapi = cast(DBAPIModule, self.dbapi)
+ self.dbapi = cast("DBAPIModule", self.dbapi)
if isinstance(e, self.dbapi.OperationalError):
err_lower = str(e).lower()
if (
connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
cursor: Optional[DBAPICursor],
) -> bool:
- self.dbapi = cast(DBAPIModule, self.dbapi)
+ self.dbapi = cast("DBAPIModule", self.dbapi)
return isinstance(
e, self.dbapi.ProgrammingError
) and "Cannot operate on a closed database." in str(e)