Add @skip_sync, @skip_async markers to mark the functions to skip.
import contextlib
from typing import Any
+import pytest
+
# Re-exports
sleep = time.sleep
Event = threading.Event
closing = contextlib.closing
+# Markers to decorate tests to run only in async or only in sync version.
+skip_sync = pytest.mark.skipif("'async' not in __name__", reason="async test only")
+skip_async = pytest.mark.skipif("'async' in __name__", reason="sync test only")
+
def is_async(obj):
"""Return true if obj is an async object (class, instance, module name)"""
from psycopg.rows import class_row, Row, TupleRow
from psycopg._compat import assert_type, Counter
-from ..acompat import Event, spawn, gather, sleep, is_async
+from ..acompat import Event, spawn, gather, sleep, skip_sync
from .test_pool_common import delay_connection
try:
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.parametrize("async_cb", [True, False])
+@pytest.mark.parametrize("async_cb", [pytest.param(True, marks=skip_sync), False])
def test_reconnect_failure(proxy, async_cb):
- if async_cb and (not is_async(__name__)):
- pytest.skip("async test only")
-
proxy.start()
t1 = None
logger.setLevel(old_level)
-@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+@skip_sync
def test_cancellation_in_queue(dsn):
# https://github.com/psycopg/psycopg/issues/509
from psycopg.rows import class_row, Row, TupleRow
from psycopg._compat import assert_type, Counter
-from ..acompat import AEvent, spawn, gather, asleep, is_async
+from ..acompat import AEvent, spawn, gather, asleep, skip_sync
from .test_pool_common_async import delay_connection
try:
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.parametrize("async_cb", [True, False])
+@pytest.mark.parametrize("async_cb", [pytest.param(True, marks=skip_sync), False])
async def test_reconnect_failure(proxy, async_cb):
- if async_cb and not is_async(__name__):
- pytest.skip("async test only")
-
proxy.start()
t1 = None
logger.setLevel(old_level)
-@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+@skip_sync
async def test_cancellation_in_queue(dsn):
# https://github.com/psycopg/psycopg/issues/509
import psycopg
-from ..acompat import Event, spawn, gather, sleep, is_alive, is_async
+from ..acompat import Event, spawn, gather, sleep, is_alive, skip_async, skip_sync
try:
import psycopg_pool as pool
p2.putconn(conn)
+@skip_async
@pytest.mark.slow
-@pytest.mark.skipif(is_async(__name__), reason="sync test only")
def test_del_stops_threads(pool_cls, dsn):
p = pool_cls(dsn)
assert p._sched_runner is not None
logger.setLevel(old_level)
-@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+@skip_sync
def test_cancellation_in_queue(pool_cls, dsn):
# https://github.com/psycopg/psycopg/issues/509
import psycopg
-from ..acompat import AEvent, spawn, gather, asleep, is_alive, is_async
+from ..acompat import AEvent, spawn, gather, asleep, is_alive, skip_async, skip_sync
try:
import psycopg_pool as pool
await p2.putconn(conn)
+@skip_async
@pytest.mark.slow
-@pytest.mark.skipif(is_async(__name__), reason="sync test only")
async def test_del_stops_threads(pool_cls, dsn):
p = pool_cls(dsn)
assert p._sched_runner is not None
logger.setLevel(old_level)
-@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+@skip_sync
async def test_cancellation_in_queue(pool_cls, dsn):
# https://github.com/psycopg/psycopg/issues/509
from psycopg.rows import class_row, Row, TupleRow
from psycopg._compat import assert_type
-from ..acompat import Event, sleep, spawn, gather, is_async
+from ..acompat import Event, sleep, spawn, gather, skip_sync
from .test_pool_common import delay_connection, ensure_waiting
try:
assert 200 <= stats["connections_ms"] < 300
-@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+@skip_sync
def test_cancellation_in_queue(dsn):
# https://github.com/psycopg/psycopg/issues/509
from psycopg.rows import class_row, Row, TupleRow
from psycopg._compat import assert_type
-from ..acompat import AEvent, asleep, spawn, gather, is_async
+from ..acompat import AEvent, asleep, spawn, gather, skip_sync
from .test_pool_common_async import delay_connection, ensure_waiting
try:
assert 200 <= stats["connections_ms"] < 300
-@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+@skip_sync
async def test_cancellation_in_queue(dsn):
# https://github.com/psycopg/psycopg/issues/509
from psycopg.conninfo import conninfo_to_dict, make_conninfo
from .utils import gc_collect
-from .acompat import is_async
+from .acompat import is_async, skip_sync, skip_async
from ._test_cursor import my_row_factory
from ._test_connection import tx_params, tx_params_isolation, tx_values_map
from ._test_connection import conninfo_params_timeout
assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
-@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+@skip_sync
def test_autocommit_readonly_property(conn):
with pytest.raises(AttributeError):
conn.autocommit = True
assert conn.autocommit is True
-@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+@skip_async
def test_autocommit_property(conn):
assert conn.autocommit is False
assert current == default
-@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+@skip_sync
@pytest.mark.parametrize("param", tx_params)
def test_transaction_param_readonly_property(conn, param):
with pytest.raises(AttributeError):
getattr(conn, f"set_{param.name}")(value)
-@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+@skip_async
@pytest.mark.crdb("skip", reason="transaction isolation")
def test_set_transaction_param_all_property(conn):
params: List[Any] = tx_params[:]
assert conn.deferrable is False
-@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+@skip_async
def test_set_transaction_param_strange_property(conn):
for val in ("asdf", 0, 5):
with pytest.raises(ValueError):
from psycopg.conninfo import conninfo_to_dict, make_conninfo
from .utils import gc_collect
-from .acompat import is_async
+from .acompat import is_async, skip_sync, skip_async
from ._test_cursor import my_row_factory
from ._test_connection import tx_params, tx_params_isolation, tx_values_map
from ._test_connection import conninfo_params_timeout
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
-@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+@skip_sync
async def test_autocommit_readonly_property(aconn):
with pytest.raises(AttributeError):
aconn.autocommit = True
assert aconn.autocommit is True
-@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+@skip_async
def test_autocommit_property(conn):
assert conn.autocommit is False
assert current == default
-@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+@skip_sync
@pytest.mark.parametrize("param", tx_params)
async def test_transaction_param_readonly_property(aconn, param):
with pytest.raises(AttributeError):
await getattr(aconn, f"set_{param.name}")(value)
-@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+@skip_async
@pytest.mark.crdb("skip", reason="transaction isolation")
def test_set_transaction_param_all_property(conn):
params: List[Any] = tx_params[:]
assert aconn.deferrable is False
-@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+@skip_async
def test_set_transaction_param_strange_property(conn):
for val in ("asdf", 0, 5):
with pytest.raises(ValueError):