import pytest
import psycopg
-import psycopg_pool as pool
from psycopg.pq import TransactionStatus
+pytestmark = []
+
+try:
+ from psycopg_pool import ConnectionPool # noqa: F401
+except ImportError as ex:
+ pytestmark.append(pytest.mark.skip(reason=str(ex)))
+else:
+ import psycopg_pool as pool
+
def test_defaults(dsn):
with pool.ConnectionPool(dsn) as p:
import pytest
import psycopg
-import psycopg_pool as pool
from psycopg.pq import TransactionStatus
from psycopg._compat import create_task
),
]
+try:
+ from psycopg_pool import AsyncConnectionPool # noqa: F401
+except ImportError as ex:
+ pytestmark.append(pytest.mark.skip(reason=str(ex)))
+else:
+ import psycopg_pool as pool
+
async def test_defaults(dsn):
async with pool.AsyncConnectionPool(dsn) as p:
import pytest
-from psycopg_pool.sched import Scheduler
+pytestmark = [pytest.mark.timing]
-pytestmark = pytest.mark.timing
+try:
+ from psycopg_pool.sched import Scheduler
+except ImportError as ex:
+ pytestmark.append(pytest.mark.skip(reason=str(ex)))
@pytest.mark.slow
import pytest
from psycopg._compat import create_task
-from psycopg_pool.sched import AsyncScheduler
pytestmark = [pytest.mark.asyncio, pytest.mark.timing]
+try:
+ from psycopg_pool.sched import AsyncScheduler
+except ImportError as ex:
+ pytestmark.append(pytest.mark.skip(reason=str(ex)))
+
@pytest.mark.slow
async def test_sched():