pool =
psycopg-pool
test =
+ anyio >= 3.6.2
mypy >= 0.990
pproxy >= 2.7
pytest >= 6.2.5
- pytest-asyncio >= 0.17
pytest-cov >= 3.0
pytest-randomly >= 3.5
dev =
build-backend = "setuptools.build_meta"
[tool.pytest.ini_options]
-asyncio_mode = "auto"
filterwarnings = [
"error",
]
import sys
import asyncio
import selectors
-from typing import List
+from typing import Any, Dict, List
+
+import pytest
pytest_plugins = (
"tests.fix_db",
raise session.Failed
cache.set("segfault", True)
- # Configure the async loop.
- loop = session.config.getoption("--loop")
- if loop == "uvloop":
- import uvloop
- uvloop.install()
- else:
- assert loop == "default"
+asyncio_options: Dict[str, Any] = {}
+if sys.platform == "win32" and sys.version_info >= (3, 8):
+ asyncio_options["policy"] = asyncio.WindowsSelectorEventLoopPolicy()
+
- if sys.platform == "win32":
- asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
+@pytest.fixture(
+ params=[pytest.param(("asyncio", asyncio_options.copy()), id="asyncio")],
+ scope="session",
+)
+def anyio_backend(request):
+ backend, options = request.param
+ if request.config.option.loop == "uvloop":
+ options["use_uvloop"] = True
+ return backend, options
allow_fail_messages: List[str] = []
importlib-metadata == 1.4
# From the 'test' extra
+anyio == 3.6.2
mypy == 0.990
pproxy == 2.7.0
pytest == 6.2.5
-pytest-asyncio == 0.17.0
pytest-cov == 3.0.0
pytest-randomly == 3.5.0
import pytest
-pytestmark = [pytest.mark.crdb, pytest.mark.asyncio]
+pytestmark = [pytest.mark.crdb, pytest.mark.anyio]
async def test_is_crdb(aconn):
from ..test_copy_async import ensure_table
from .test_copy import sample_tabledef, copyopt
-pytestmark = [pytest.mark.crdb, pytest.mark.asyncio]
+pytestmark = [pytest.mark.crdb, pytest.mark.anyio]
@pytest.mark.parametrize(
testfeed # fixture
-pytestmark = [pytest.mark.crdb, pytest.mark.asyncio]
+pytestmark = [pytest.mark.crdb, pytest.mark.anyio]
@pytest.mark.slow
@pytest.fixture(scope="session")
-def aconn_cls(session_dsn):
+def aconn_cls(session_dsn, anyio_backend):
cls = psycopg.AsyncConnection
if crdb_version:
from psycopg.crdb import AsyncCrdbConnection
--- /dev/null
+import pytest
+
+from ..conftest import asyncio_options
+
+
+@pytest.fixture(
+ params=[pytest.param(("asyncio", asyncio_options.copy()), id="asyncio")],
+ scope="session",
+)
+def anyio_backend(request):
+ backend, options = request.param
+ if request.config.option.loop == "uvloop":
+ options["use_uvloop"] = True
+ return backend, options
from psycopg._compat import create_task
from .test_pool_async import delay_connection, ensure_waiting
-pytestmark = [pytest.mark.asyncio]
+pytestmark = [pytest.mark.anyio]
try:
from psycopg_pool import AsyncNullConnectionPool # noqa: F401
# Tests should have been skipped if the package is not available
pass
-pytestmark = [pytest.mark.asyncio]
+pytestmark = [pytest.mark.anyio]
async def test_defaults(dsn):
# Tests should have been skipped if the package is not available
pass
-pytestmark = [pytest.mark.asyncio, pytest.mark.timing]
+pytestmark = [pytest.mark.anyio, pytest.mark.timing]
@pytest.mark.slow
from .fix_crdb import crdb_encoding
execmany = execmany # avoid F811 underneath
-pytestmark = pytest.mark.asyncio
+pytestmark = pytest.mark.anyio
@pytest.fixture
from psycopg import errors as e
from psycopg._compat import create_task
-pytestmark = pytest.mark.asyncio
-
@pytest.mark.slow
async def test_commit_concurrency(aconn):
sys.platform == "win32", reason="don't know how to Ctrl-C on Windows"
)
@pytest.mark.crdb_skip("cancel")
-async def test_ctrl_c(dsn):
+def test_ctrl_c(dsn):
script = f"""\
import signal
import asyncio
from .test_adapt import make_bin_dumper, make_dumper
from .test_conninfo import fake_resolve # noqa: F401
-pytestmark = pytest.mark.asyncio
+pytestmark = pytest.mark.anyio
async def test_connect(aconn_cls, dsn):
),
],
)
-@pytest.mark.asyncio
+@pytest.mark.anyio
async def test_resolve_hostaddr_async_no_resolve(
setpgenv, conninfo, want, env, fail_resolve
):
),
],
)
-@pytest.mark.asyncio
+@pytest.mark.anyio
async def test_resolve_hostaddr_async(conninfo, want, env, fake_resolve):
params = conninfo_to_dict(conninfo)
params = await resolve_hostaddr_async(params)
("host=1.1.1.1,2.2.2.2", {"PGPORT": "1,2,3"}),
],
)
-@pytest.mark.asyncio
+@pytest.mark.anyio
async def test_resolve_hostaddr_async_bad(setpgenv, conninfo, env, fake_resolve):
setpgenv(env)
params = conninfo_to_dict(conninfo)
from .test_copy import py_to_raw, special_chars
pytestmark = [
- pytest.mark.asyncio,
pytest.mark.crdb_skip("copy"),
]
from .fix_crdb import crdb_encoding
execmany = execmany # avoid F811 underneath
-pytestmark = pytest.mark.asyncio
async def test_init(aconn):
pytestmark = [pytest.mark.dns]
-@pytest.mark.asyncio
+@pytest.mark.anyio
async def test_resolve_hostaddr_async_warning(recwarn):
import_dnspython()
conninfo = "dbname=foo"
assert conninfo_to_dict(want) == params
-@pytest.mark.asyncio
+@pytest.mark.anyio
@pytest.mark.parametrize("conninfo, want, env", samples_ok)
async def test_srv_async(conninfo, want, env, afake_srv, setpgenv):
setpgenv(env)
psycopg._dns.resolve_srv(params) # type: ignore[attr-defined]
-@pytest.mark.asyncio
+@pytest.mark.anyio
@pytest.mark.parametrize("conninfo, env", samples_bad)
async def test_srv_bad_async(conninfo, env, afake_srv, setpgenv):
setpgenv(env)
assert exc.value.diag.sqlstate == "23503"
-@pytest.mark.asyncio
@pytest.mark.crdb_skip("deferrable")
async def test_diag_from_commit_async(aconn):
cur = aconn.cursor()
from .test_pipeline import pipeline_aborted
pytestmark = [
- pytest.mark.asyncio,
pytest.mark.pipeline,
pytest.mark.skipif("not psycopg.AsyncPipeline.is_supported()"),
]
from psycopg.rows import namedtuple_row
-pytestmark = pytest.mark.asyncio
-
@pytest.mark.parametrize("value", [None, 0, 3])
async def test_prepare_threshold_init(aconn_cls, dsn, value):
from psycopg.pq import Format
pytestmark = [
- pytest.mark.asyncio,
pytest.mark.crdb_skip("server-side cursor"),
]
from psycopg.pq import TransactionStatus
pytestmark = [
- pytest.mark.asyncio,
pytest.mark.crdb_skip("2-phase commit"),
]
from .test_transaction import ExpectedException, crdb_skip_external_observer
from .test_transaction import create_test_table # noqa # autouse fixture
-pytestmark = pytest.mark.asyncio
-
@pytest.fixture
-async def aconn(aconn, apipeline):
+async def aconn(aconn, apipeline, anyio_backend):
return aconn
assert info.regtype == "text"
-@pytest.mark.asyncio
@pytest.mark.parametrize("name", ["text", sql.Identifier("text")])
@pytest.mark.parametrize("status", ["IDLE", "INTRANS", None])
@pytest.mark.parametrize(
assert info is None
-@pytest.mark.asyncio
@_name
@_status
@_info_cls
@pytest.mark.parametrize("timeout", timeouts)
-@pytest.mark.asyncio
+@pytest.mark.anyio
async def test_wait_conn_async(dsn, timeout):
gen = generators.connect(dsn)
conn = await waiting.wait_conn_async(gen, **timeout)
assert conn.status == ConnStatus.OK
-@pytest.mark.asyncio
+@pytest.mark.anyio
async def test_wait_conn_async_bad(dsn):
gen = generators.connect("dbname=nosuchdb")
with pytest.raises(psycopg.OperationalError):
await waiting.wait_conn_async(gen)
-@pytest.mark.asyncio
+@pytest.mark.anyio
@pytest.mark.parametrize("wait, ready", zip(waiting.Wait, waiting.Ready))
@skip_if_not_linux
async def test_wait_ready_async(wait, ready):
assert r & ready
-@pytest.mark.asyncio
+@pytest.mark.anyio
async def test_wait_async(pgconn):
pgconn.send_query(b"select 1")
gen = generators.execute(pgconn)
assert res.status == ExecStatus.TUPLES_OK
-@pytest.mark.asyncio
+@pytest.mark.anyio
async def test_wait_async_bad(pgconn):
pgconn.send_query(b"select 1")
gen = generators.execute(pgconn)
assert info.field_types[i] == builtins[t].oid
-@pytest.mark.asyncio
@pytest.mark.parametrize("name, fields", fetch_cases)
async def test_fetch_info_async(aconn, testcomp, name, fields):
info = await CompositeInfo.fetch(aconn, name)
assert info.labels == list(StrTestEnum.__members__)
-@pytest.mark.asyncio
async def test_fetch_info_async(aconn):
info = await EnumInfo.fetch(aconn, "PureTestEnum")
assert info.name == "puretestenum"
assert MultirangeInfo.fetch(conn, "nosuchrange") is None
-@pytest.mark.asyncio
@pytest.mark.parametrize("name, subtype", fetch_cases)
async def test_fetch_info_async(aconn, testmr, name, subtype): # noqa: F811
info = await MultirangeInfo.fetch(aconn, name)
assert info.subtype_oid == aconn.adapters.types[subtype].oid
-@pytest.mark.asyncio
async def test_fetch_info_not_found_async(aconn):
assert await MultirangeInfo.fetch(aconn, "nosuchrange") is None
assert RangeInfo.fetch(conn, "nosuchrange") is None
-@pytest.mark.asyncio
@pytest.mark.parametrize("name, subtype", fetch_cases)
async def test_fetch_info_async(aconn, testrange, name, subtype):
info = await RangeInfo.fetch(aconn, name)
assert info.subtype_oid == aconn.adapters.types[subtype].oid
-@pytest.mark.asyncio
async def test_fetch_info_not_found_async(aconn):
assert await RangeInfo.fetch(aconn, "nosuchrange") is None