from psycopg.pq import TransactionStatus
from psycopg.rows import Row, TupleRow, class_row
+from .. import acompat
from ..utils import assert_type, set_autocommit, skip_free_threaded
-from ..acompat import Event, gather, skip_sync, sleep, spawn
+from ..acompat import Event, gather, sleep, spawn
from .test_pool_common import delay_connection
try:
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.parametrize("async_cb", [pytest.param(True, marks=skip_sync), False])
+@pytest.mark.parametrize(
+ "async_cb", [pytest.param(True, marks=acompat.skip_sync), False]
+)
def test_reconnect_failure(proxy, async_cb):
proxy.start()
logger.setLevel(old_level)
-@skip_sync
-def test_cancellation_in_queue(dsn):
- # https://github.com/psycopg/psycopg/issues/509
-
- nconns = 3
-
- with pool.ConnectionPool(dsn, min_size=nconns, timeout=1) as p:
- p.wait()
-
- got_conns = []
- ev = Event()
-
- def worker(i):
- try:
- logging.info("worker %s started", i)
- with p.connection() as conn:
- logging.info("worker %s got conn", i)
- cur = conn.execute("select 1")
- assert cur.fetchone() == (1,)
-
- got_conns.append(conn)
- if len(got_conns) >= nconns:
- ev.set()
-
- sleep(5)
- except BaseException as ex:
- logging.info("worker %s stopped: %r", i, ex)
- raise
-
- # Start tasks taking up all the connections and getting in the queue
- tasks = [spawn(worker, (i,)) for i in range(nconns * 3)]
-
- # wait until the pool has served all the connections and clients are queued.
- assert ev.wait(3.0)
- for i in range(10):
- if p.get_stats().get("requests_queued", 0):
- break
- else:
- sleep(0.1)
- else:
- pytest.fail("no client got in the queue")
-
- [task.cancel() for task in reversed(tasks)]
- gather(*tasks, return_exceptions=True, timeout=1.0)
-
- stats = p.get_stats()
- assert stats["pool_available"] == 3
- assert stats.get("requests_waiting", 0) == 0
-
- with p.connection() as conn:
- cur = conn.execute("select 1")
- assert cur.fetchone() == (1,)
-
-
@pytest.mark.slow
@pytest.mark.timing
def test_check_backoff(dsn, caplog, monkeypatch):
from psycopg.pq import TransactionStatus
from psycopg.rows import Row, TupleRow, class_row
+from .. import acompat
from ..utils import assert_type, set_autocommit, skip_free_threaded
from ..acompat import AEvent, asleep, gather, skip_sync, spawn
from .test_pool_common_async import delay_connection
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.parametrize("async_cb", [pytest.param(True, marks=skip_sync), False])
+@pytest.mark.parametrize(
+ "async_cb", [pytest.param(True, marks=acompat.skip_sync), False]
+)
async def test_reconnect_failure(proxy, async_cb):
proxy.start()
import logging
from time import time
from typing import Any
-from asyncio import CancelledError
import pytest
import psycopg
from ..utils import set_autocommit
-from ..acompat import Event, gather, is_alive, skip_async, skip_sync, sleep, spawn
+from ..acompat import Event, gather, is_alive, skip_async, sleep, spawn
try:
import psycopg_pool as pool
assert time() - t0 <= 1.5
-@skip_sync
-def test_cancellation_in_queue(pool_cls, dsn):
- # https://github.com/psycopg/psycopg/issues/509
-
- nconns = 3
-
- with pool_cls(
- dsn, min_size=min_size(pool_cls, nconns), max_size=nconns, timeout=1
- ) as p:
- p.wait()
-
- got_conns = []
- ev = Event()
-
- def worker(i):
- try:
- logging.info("worker %s started", i)
- with p.connection() as conn:
- logging.info("worker %s got conn", i)
- cur = conn.execute("select 1")
- assert cur.fetchone() == (1,)
-
- got_conns.append(conn)
- if len(got_conns) >= nconns:
- ev.set()
-
- sleep(5)
- except BaseException as ex:
- logging.info("worker %s stopped: %r", i, ex)
- raise
-
- # Start tasks taking up all the connections and getting in the queue
- tasks = [spawn(worker, (i,)) for i in range(nconns * 3)]
-
- # wait until the pool has served all the connections and clients are queued.
- assert ev.wait(3.0)
- for i in range(10):
- if p.get_stats().get("requests_queued", 0):
- break
- else:
- sleep(0.1)
- else:
- pytest.fail("no client got in the queue")
-
- [task.cancel() for task in reversed(tasks)]
- gather(*tasks, return_exceptions=True, timeout=1.0)
-
- stats = p.get_stats()
- assert stats["pool_available"] == min_size(pool_cls, nconns)
- assert stats.get("requests_waiting", 0) == 0
-
- with p.connection() as conn:
- cur = conn.execute("select 1")
- assert cur.fetchone() == (1,)
-
-
-@skip_sync
-def test_cancel_on_check(pool_cls, dsn):
- do_cancel = True
-
- def check(conn):
- nonlocal do_cancel
- if do_cancel:
- do_cancel = False
- raise CancelledError()
-
- pool_cls.check_connection(conn)
-
- with pool_cls(dsn, min_size=min_size(pool_cls, 1), check=check, timeout=1.0) as p:
- try:
- with p.connection() as conn:
- conn.execute("select 1")
- except CancelledError:
- pass
-
- with p.connection() as conn:
- conn.execute("select 1")
-
-
-@skip_sync
-def test_cancel_on_rollback(pool_cls, dsn, monkeypatch):
- do_cancel = False
-
- with pool_cls(dsn, min_size=min_size(pool_cls, 1), timeout=1.0) as p:
- with p.connection() as conn:
-
- def rollback(self):
- if do_cancel:
- raise CancelledError()
- else:
- type(self).rollback(self)
-
- monkeypatch.setattr(type(conn), "rollback", rollback)
- conn.execute("select 1")
-
- do_cancel = True
- with pytest.raises((psycopg.errors.SyntaxError, CancelledError)):
- with p.connection() as conn:
- conn.execute("selexx 2")
-
- do_cancel = False
- with p.connection() as conn:
- cur = conn.execute("select 3")
- assert cur.fetchone() == (3,)
-
-
@pytest.mark.crdb_skip("backend pid")
def test_drain(pool_cls, dsn):
pids1 = set()
import logging
from time import time
from typing import Any
-from asyncio import CancelledError
import pytest
@skip_sync
async def test_cancel_on_check(pool_cls, dsn):
+ from asyncio import CancelledError
+
do_cancel = True
async def check(conn):
@skip_sync
async def test_cancel_on_rollback(pool_cls, dsn, monkeypatch):
+ from asyncio import CancelledError
+
do_cancel = False
async with pool_cls(dsn, min_size=min_size(pool_cls, 1), timeout=1.0) as p:
from psycopg.rows import Row, TupleRow, class_row
from ..utils import assert_type, set_autocommit
-from ..acompat import Event, gather, skip_sync, sleep, spawn
+from ..acompat import gather, sleep, spawn
from .test_pool_common import delay_connection, ensure_waiting
try:
assert 200 <= stats["connections_ms"] < 300
-@skip_sync
-def test_cancellation_in_queue(dsn):
- # https://github.com/psycopg/psycopg/issues/509
-
- nconns = 3
-
- with pool.NullConnectionPool(dsn, min_size=0, max_size=nconns, timeout=1) as p:
- p.wait()
-
- got_conns = []
- ev = Event()
-
- def worker(i):
- try:
- logging.info("worker %s started", i)
- with p.connection() as conn:
- logging.info("worker %s got conn", i)
- cur = conn.execute("select 1")
- assert cur.fetchone() == (1,)
-
- got_conns.append(conn)
- if len(got_conns) >= nconns:
- ev.set()
-
- sleep(5)
- except BaseException as ex:
- logging.info("worker %s stopped: %r", i, ex)
- raise
-
- # Start tasks taking up all the connections and getting in the queue
- tasks = [spawn(worker, (i,)) for i in range(nconns * 3)]
-
- # wait until the pool has served all the connections and clients are queued.
- assert ev.wait(3.0)
- for i in range(10):
- if p.get_stats().get("requests_queued", 0):
- break
- else:
- sleep(0.1)
- else:
- pytest.fail("no client got in the queue")
-
- [task.cancel() for task in reversed(tasks)]
- gather(*tasks, return_exceptions=True, timeout=1.0)
-
- stats = p.get_stats()
- assert stats.get("requests_waiting", 0) == 0
-
- with p.connection() as conn:
- cur = conn.execute("select 1")
- assert cur.fetchone() == (1,)
-
-
def test_close_returns(dsn):
# Mostly test the interface; close is close even if it goes via putconn().
with pool.NullConnectionPool(dsn, close_returns=True) as p:
from psycopg.rows import Row, TupleRow, class_row
from ..utils import assert_type, set_autocommit
-from ..acompat import AEvent, asleep, gather, skip_sync, spawn
+from ..acompat import asleep, gather, skip_sync, spawn
from .test_pool_common_async import delay_connection, ensure_waiting
try:
@skip_sync
async def test_cancellation_in_queue(dsn):
+ from ..acompat import AEvent
+
# https://github.com/psycopg/psycopg/issues/509
nconns = 3
from psycopg.conninfo import conninfo_to_dict, make_conninfo, timeout_from_conninfo
from psycopg._conninfo_utils import get_param
-from .acompat import skip_async, skip_sync, sleep
+from .acompat import skip_async, sleep
from .test_adapt import make_bin_dumper, make_dumper
from ._test_cursor import my_row_factory
from ._test_connection import testctx # noqa: F401 # fixture
assert conn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
-@skip_sync
-def test_autocommit_readonly_property(conn):
- with pytest.raises(AttributeError):
- conn.autocommit = True
- assert not conn.autocommit
-
-
def test_autocommit(conn):
assert conn.autocommit is False
conn.set_autocommit(True)
assert current == default
-@skip_sync
-@pytest.mark.parametrize("param", tx_params)
-def test_transaction_param_readonly_property(conn, param):
- with pytest.raises(AttributeError):
- setattr(conn, param.name, None)
-
-
@pytest.mark.parametrize("autocommit", [True, False])
@pytest.mark.parametrize("param", tx_params_isolation)
def test_set_transaction_param_implicit(conn, param, autocommit):
"wait_timeout": "wait",
}
_skip_imports = {
- "acompat": {"alist", "anext"},
+ "acompat": {"alist", "anext", "skip_sync"},
"_acompat": {"ensure_async"},
}
return node
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> ast.AST:
+ for deco in node.decorator_list:
+ match deco:
+ case ast.Name(id="skip_sync"):
+ return None
self._fix_docstring(node.body)
node.name = self.names_map.get(node.name, node.name)
for arg in node.args.args: