if sys.version_info >= (3, 8):
create_task = asyncio.create_task
- Task = asyncio.Task
elif sys.version_info >= (3, 7):
) -> "asyncio.Future[T]":
return asyncio.create_task(coro)
- Task = asyncio.Future
else:
) -> "asyncio.Future[T]":
return asyncio.ensure_future(coro)
- Task = asyncio.Future
if sys.version_info >= (3, 9):
from zoneinfo import ZoneInfo
--- /dev/null
+"""
+compatibility functions for different Python versions
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+import sys
+import asyncio
+from typing import Any, Awaitable, Generator, Optional, Union, TypeVar
+
+T = TypeVar("T")
+FutureT = Union["asyncio.Future[T]", Generator[Any, None, T], Awaitable[T]]
+
+if sys.version_info >= (3, 7):
+ from contextlib import asynccontextmanager
+else:
+
+ def asynccontextmanager(func):
+ def helper(*args, **kwds):
+ raise NotImplementedError(
+ "async pool not implemented on Python 3.6"
+ )
+
+ return helper
+
+
+if sys.version_info >= (3, 8):
+ create_task = asyncio.create_task
+ Task = asyncio.Task
+
+elif sys.version_info >= (3, 7):
+
+ def create_task(
+ coro: FutureT[T], name: Optional[str] = None
+ ) -> "asyncio.Future[T]":
+ return asyncio.create_task(coro)
+
+ Task = asyncio.Future
+
+else:
+
+ def create_task(
+ coro: FutureT[T], name: Optional[str] = None
+ ) -> "asyncio.Future[T]":
+ return asyncio.ensure_future(coro)
+
+ Task = asyncio.Future
+
+if sys.version_info >= (3, 9):
+ from collections import Counter, deque as Deque
+else:
+ from typing import Counter, Deque
+
+__all__ = [
+ "Counter",
+ "Deque",
+ "Task",
+ "asynccontextmanager",
+ "create_task",
+]
from typing import Any, Callable, Dict, Generic, Optional
from psycopg.abc import ConnectionType
-from psycopg._compat import Counter, Deque
+
+from ._compat import Counter, Deque
class BasePool(Generic[ConnectionType]):
from psycopg import errors as e
from psycopg import Connection
from psycopg.pq import TransactionStatus
-from psycopg._compat import Deque
from .base import ConnectionAttempt, BasePool
from .sched import Scheduler
from .errors import PoolClosed, PoolTimeout, TooManyRequests
+from ._compat import Deque
logger = logging.getLogger("psycopg.pool")
from psycopg import errors as e
from psycopg.pq import TransactionStatus
-from psycopg._compat import Task, asynccontextmanager, create_task, Deque
from psycopg.connection_async import AsyncConnection
from .base import ConnectionAttempt, BasePool
from .sched import AsyncScheduler
from .errors import PoolClosed, PoolTimeout, TooManyRequests
+from ._compat import Task, asynccontextmanager, create_task, Deque
logger = logging.getLogger("psycopg.pool")