Enable cancel-based tests on CRDB 22.
"large objects": 243,
"negative interval": 81577,
"nested array": 32552,
+ "no col query": None,
"notify": 41522,
"password_encryption": 42519,
"pg_terminate_backend": 35897,
@pytest.fixture
-@pytest.mark.crdb("skip", reason="2-phase commit")
+@pytest.mark.crdb_skip("2-phase commit")
def tpc(svcconn):
tpc = Tpc(svcconn)
tpc.check_tpc()
assert conn.autocommit
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_its_no_pool_at_all(dsn):
with NullConnectionPool(dsn, max_size=2) as p:
with p.connection() as conn:
assert "WAT" in caplog.records[0].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_reset(dsn):
resets = 0
assert pids[0] == pids[1]
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_reset_badstate(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
assert "INTRANS" in caplog.records[0].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_reset_broken(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_queue(dsn):
def worker(n):
t0 = time()
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_queue_timeout(dsn):
def worker(n):
t0 = time()
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_queue_timeout_override(dsn):
def worker(n):
t0 = time()
assert 0.1 < e[1] < 0.15
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_broken_reconnect(dsn):
with NullConnectionPool(dsn, max_size=1) as p:
with p.connection() as conn:
assert pid1 != pid2
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_intrans_rollback(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
pids = []
assert "INTRANS" in caplog.records[0].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_inerror_rollback(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
pids = []
assert "INERROR" in caplog.records[0].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_active_close(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
pids = []
assert "BAD" in caplog.records[1].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_fail_rollback_close(dsn, caplog, monkeypatch):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
pids = []
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_max_lifetime(dsn):
pids = []
assert conn.autocommit
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_its_no_pool_at_all(dsn):
async with AsyncNullConnectionPool(dsn, max_size=2) as p:
async with p.connection() as conn:
assert "WAT" in caplog.records[0].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_reset(dsn):
resets = 0
assert pids[0] == pids[1]
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_reset_badstate(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
assert "INTRANS" in caplog.records[0].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_reset_broken(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_queue(dsn):
async def worker(n):
t0 = time()
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_queue_timeout(dsn):
async def worker(n):
t0 = time()
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_queue_timeout_override(dsn):
async def worker(n):
t0 = time()
assert 0.1 < e[1] < 0.15
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_broken_reconnect(dsn):
async with AsyncNullConnectionPool(dsn, max_size=1) as p:
async with p.connection() as conn:
assert pid1 != pid2
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_intrans_rollback(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
pids = []
assert "INTRANS" in caplog.records[0].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_inerror_rollback(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
pids = []
assert "INERROR" in caplog.records[0].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_active_close(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
pids = []
assert "BAD" in caplog.records[1].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_fail_rollback_close(dsn, caplog, monkeypatch):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
pids = []
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_max_lifetime(dsn):
pids: List[int] = []
assert conn.autocommit
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_its_really_a_pool(dsn):
with pool.ConnectionPool(dsn, min_size=2) as p:
with p.connection() as conn:
assert p.closed
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_connection_not_lost(dsn):
with pool.ConnectionPool(dsn, min_size=1) as p:
with pytest.raises(ZeroDivisionError):
assert resets == 2
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_reset_badstate(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
assert "INTRANS" in caplog.records[0].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_reset_broken(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_queue(dsn):
def worker(n):
t0 = time()
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_queue_timeout(dsn):
def worker(n):
t0 = time()
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_queue_timeout_override(dsn):
def worker(n):
t0 = time()
assert 0.1 < e[1] < 0.15
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_broken_reconnect(dsn):
with pool.ConnectionPool(dsn, min_size=1) as p:
with p.connection() as conn:
assert pid1 != pid2
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_intrans_rollback(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
assert "INTRANS" in caplog.records[0].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_inerror_rollback(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
assert "INERROR" in caplog.records[0].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_active_close(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
assert "BAD" in caplog.records[1].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_fail_rollback_close(dsn, caplog, monkeypatch):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_max_lifetime(dsn):
with pool.ConnectionPool(dsn, min_size=1, max_lifetime=0.2) as p:
sleep(0.1)
assert pids[0] == pids[1] != pids[4], pids
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
def test_check(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
with pool.ConnectionPool(dsn, min_size=4) as p:
assert conn.autocommit
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_its_really_a_pool(dsn):
async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
async with p.connection() as conn:
assert p.closed
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_connection_not_lost(dsn):
async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
with pytest.raises(ZeroDivisionError):
assert resets == 2
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_reset_badstate(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
assert "INTRANS" in caplog.records[0].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_reset_broken(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_queue(dsn):
async def worker(n):
t0 = time()
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_queue_timeout(dsn):
async def worker(n):
t0 = time()
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_queue_timeout_override(dsn):
async def worker(n):
t0 = time()
assert 0.1 < e[1] < 0.15
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_broken_reconnect(dsn):
async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
async with p.connection() as conn:
assert pid1 != pid2
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_intrans_rollback(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
assert "INTRANS" in caplog.records[0].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_inerror_rollback(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
assert "INERROR" in caplog.records[0].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_active_close(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
assert "BAD" in caplog.records[1].message
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_fail_rollback_close(dsn, caplog, monkeypatch):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_max_lifetime(dsn):
async with pool.AsyncConnectionPool(dsn, min_size=1, max_lifetime=0.2) as p:
await asyncio.sleep(0.1)
assert pids[0] == pids[1] != pids[4], pids
-@pytest.mark.crdb("skip", reason="backend pid")
+@pytest.mark.crdb_skip("backend pid")
async def test_check(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg.pool")
async with pool.AsyncConnectionPool(dsn, min_size=4) as p:
pgconn.send_describe_prepared(b"prep")
-@pytest.mark.crdb("skip", reason="server-side cursor")
+@pytest.mark.crdb_skip("server-side cursor")
def test_send_describe_portal(pgconn):
res = pgconn.exec_(
b"""
import psycopg
from psycopg import pq
-pytestmark = pytest.mark.crdb("skip", reason="copy")
+pytestmark = pytest.mark.crdb_skip("copy")
sample_values = "values (10::int, 20::int, 'hello'::text), (40, NULL, 'world')"
assert res.get_value(0, 0) == out
-@pytest.mark.crdb("skip", reason="server-side cursor")
+@pytest.mark.crdb_skip("server-side cursor")
def test_describe_portal(pgconn):
res = pgconn.exec_(
b"""
assert "NULL" in pq.error_message(pgconn)
-@pytest.mark.crdb("skip", reason="encoding")
+@pytest.mark.crdb_skip("encoding")
def test_error_message_encoding(pgconn):
res = pgconn.exec_(b"set client_encoding to latin9")
assert res.status == pq.ExecStatus.COMMAND_OK
pgconn.info
-@pytest.mark.crdb("skip", reason="pg_terminate_backend")
+@pytest.mark.crdb_skip("pg_terminate_backend")
def test_reset(pgconn):
assert pgconn.status == pq.ConnStatus.OK
pgconn.exec_(b"select pg_terminate_backend(pg_backend_pid())")
assert pgconn.status == pq.ConnStatus.BAD
-@pytest.mark.crdb("skip", reason="pg_terminate_backend")
+@pytest.mark.crdb_skip("pg_terminate_backend")
def test_reset_async(pgconn):
assert pgconn.status == pq.ConnStatus.OK
pgconn.exec_(b"select pg_terminate_backend(pg_backend_pid())")
pgconn.parameter_status(b"application_name")
-@pytest.mark.crdb("skip", reason="encoding")
+@pytest.mark.crdb_skip("encoding")
def test_encoding(pgconn):
res = pgconn.exec_(b"set client_encoding to latin1")
assert res.status == pq.ExecStatus.COMMAND_OK
cancel.free()
-@pytest.mark.crdb("skip", reason="notify")
+@pytest.mark.crdb_skip("notify")
def test_notify(pgconn):
assert pgconn.notifies() is None
assert pgconn.notifies() is None
-@pytest.mark.crdb("skip", reason="do")
+@pytest.mark.crdb_skip("do")
def test_notice_nohandler(pgconn):
pgconn.exec_(b"set client_min_messages to notice")
res = pgconn.exec_(
assert res.status == pq.ExecStatus.COMMAND_OK
-@pytest.mark.crdb("skip", reason="do")
+@pytest.mark.crdb_skip("do")
def test_notice(pgconn):
msgs = []
assert msgs and msgs[0] == b"hello notice"
-@pytest.mark.crdb("skip", reason="do")
+@pytest.mark.crdb_skip("do")
def test_notice_error(pgconn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
@pytest.mark.libpq(">= 10")
-@pytest.mark.crdb("skip", reason="password_encryption")
+@pytest.mark.crdb_skip("password_encryption")
def test_encrypt_password_query(pgconn):
res = pgconn.exec_(b"set password_encryption to 'md5'")
assert res.status == pq.ExecStatus.COMMAND_OK, pgconn.error_message.decode()
assert cur._query.params == (b"3", b"4")
-@pytest.mark.crdb("skip", reason="copy")
+@pytest.mark.crdb_skip("copy")
@pytest.mark.parametrize("ph, params", [("%s", (10,)), ("%(n)s", {"n": 10})])
def test_copy_out_param(conn, ph, params):
cur = conn.cursor()
unpickled = pickle.loads(pickled)
assert [tuple(d) for d in description] == [tuple(d) for d in unpickled]
- @pytest.mark.crdb("skip", reason="no col query")
+ @pytest.mark.crdb_skip("no col query")
def test_no_col_query(self, conn):
cur = conn.execute("select")
assert cur.description == []
assert cur._query.params == (b"3", b"4")
-@pytest.mark.crdb("skip", reason="copy")
+@pytest.mark.crdb_skip("copy")
@pytest.mark.parametrize("ph, params", [("%s", (10,)), ("%(n)s", {"n": 10})])
async def test_copy_out_param(aconn, ph, params):
cur = aconn.cursor()
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="notify")
+@pytest.mark.crdb_skip("notify")
def test_notifies(conn_cls, conn, dsn):
nconn = conn_cls.connect(dsn, autocommit=True)
npid = nconn.pgconn.backend_pid
@pytest.mark.slow
-@pytest.mark.crdb("skip", reason="cancel")
+@pytest.mark.crdb_skip("cancel")
def test_cancel(conn):
errors: List[Exception] = []
@pytest.mark.slow
-@pytest.mark.crdb("skip", reason="cancel")
+@pytest.mark.crdb_skip("cancel")
def test_cancel_stream(conn):
errors: List[Exception] = []
t.join()
-@pytest.mark.crdb("skip", reason="pg_terminate_backend")
+@pytest.mark.crdb_skip("pg_terminate_backend")
@pytest.mark.slow
def test_identify_closure(conn_cls, dsn):
def closer():
@pytest.mark.skipif(
sys.platform == "win32", reason="don't know how to Ctrl-C on Windows"
)
-@pytest.mark.crdb("skip", reason="cancel")
+@pytest.mark.crdb_skip("cancel")
def test_ctrl_c(dsn):
if sys.platform == "win32":
sig = int(signal.CTRL_C_EVENT)
@pytest.mark.slow
@pytest.mark.timing
-@pytest.mark.crdb("skip", reason="notify")
+@pytest.mark.crdb_skip("notify")
async def test_notifies(aconn_cls, aconn, dsn):
nconn = await aconn_cls.connect(dsn, autocommit=True)
npid = nconn.pgconn.backend_pid
@pytest.mark.slow
-@pytest.mark.crdb("skip", reason="cancel")
+@pytest.mark.crdb_skip("cancel")
async def test_cancel(aconn):
async def worker():
cur = aconn.cursor()
@pytest.mark.slow
-@pytest.mark.crdb("skip", reason="cancel")
+@pytest.mark.crdb_skip("cancel")
async def test_cancel_stream(aconn):
async def worker():
cur = aconn.cursor()
@pytest.mark.slow
-@pytest.mark.crdb("skip", reason="pg_terminate_backend")
+@pytest.mark.crdb_skip("pg_terminate_backend")
async def test_identify_closure(aconn_cls, dsn):
async def closer():
await asyncio.sleep(0.2)
@pytest.mark.skipif(
sys.platform == "win32", reason="don't know how to Ctrl-C on Windows"
)
-@pytest.mark.crdb("skip", reason="cancel")
+@pytest.mark.crdb_skip("cancel")
async def test_ctrl_c(dsn):
script = f"""\
import signal
cur.execute("select 1")
-@pytest.mark.crdb("skip", reason="pg_terminate_backend")
+@pytest.mark.crdb_skip("pg_terminate_backend")
def test_broken(conn):
with pytest.raises(psycopg.OperationalError):
conn.execute("select pg_terminate_backend(%s)", [conn.pgconn.backend_pid])
conn.close()
-@pytest.mark.crdb("skip", reason="pg_terminate_backend")
+@pytest.mark.crdb_skip("pg_terminate_backend")
def test_context_inerror_rollback_no_clobber(conn_cls, conn, dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
assert "in rollback" in rec.message
-@pytest.mark.crdb("skip", reason="copy")
+@pytest.mark.crdb_skip("copy")
def test_context_active_rollback_no_clobber(conn_cls, dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
conn.commit()
-@pytest.mark.crdb("skip", reason="deferrable")
+@pytest.mark.crdb_skip("deferrable")
def test_commit_error(conn):
conn.execute(
"""
conn_cls.connect(*args, **kwargs)
-@pytest.mark.crdb("skip", reason="pg_terminate_backend")
+@pytest.mark.crdb_skip("pg_terminate_backend")
def test_broken_connection(conn):
cur = conn.cursor()
with pytest.raises(psycopg.DatabaseError):
assert conn.closed
-@pytest.mark.crdb("skip", reason="do")
+@pytest.mark.crdb_skip("do")
def test_notice_handlers(conn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
messages = []
conn.remove_notice_handler(cb1)
-@pytest.mark.crdb("skip", reason="notify")
+@pytest.mark.crdb_skip("notify")
def test_notify_handlers(conn):
nots1 = []
nots2 = []
tx_params = [
param_isolation,
param_read_only,
- pytest.param(param_deferrable, marks=pytest.mark.crdb("skip", reason="deferrable")),
+ pytest.param(param_deferrable, marks=pytest.mark.crdb_skip("deferrable")),
]
tx_params_isolation = [
pytest.param(
marks=pytest.mark.crdb("skip", reason="transaction isolation"),
),
param_read_only,
- pytest.param(param_deferrable, marks=pytest.mark.crdb("skip", reason="deferrable")),
+ pytest.param(param_deferrable, marks=pytest.mark.crdb_skip("deferrable")),
]
await cur.execute("select 1")
-@pytest.mark.crdb("skip", reason="pg_terminate_backend")
+@pytest.mark.crdb_skip("pg_terminate_backend")
async def test_broken(aconn):
with pytest.raises(psycopg.OperationalError):
await aconn.execute(
await aconn.close()
-@pytest.mark.crdb("skip", reason="pg_terminate_backend")
+@pytest.mark.crdb_skip("pg_terminate_backend")
async def test_context_inerror_rollback_no_clobber(aconn_cls, conn, dsn, caplog):
with pytest.raises(ZeroDivisionError):
async with await aconn_cls.connect(dsn) as conn2:
assert "in rollback" in rec.message
-@pytest.mark.crdb("skip", reason="copy")
+@pytest.mark.crdb_skip("copy")
async def test_context_active_rollback_no_clobber(aconn_cls, dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
await aconn.commit()
-@pytest.mark.crdb("skip", reason="deferrable")
+@pytest.mark.crdb_skip("deferrable")
async def test_commit_error(aconn):
await aconn.execute(
"""
await aconn_cls.connect(*args, **kwargs)
-@pytest.mark.crdb("skip", reason="pg_terminate_backend")
+@pytest.mark.crdb_skip("pg_terminate_backend")
async def test_broken_connection(aconn):
cur = aconn.cursor()
with pytest.raises(psycopg.DatabaseError):
assert aconn.closed
-@pytest.mark.crdb("skip", reason="do")
+@pytest.mark.crdb_skip("do")
async def test_notice_handlers(aconn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
messages = []
aconn.remove_notice_handler(cb1)
-@pytest.mark.crdb("skip", reason="notify")
+@pytest.mark.crdb_skip("notify")
async def test_notify_handlers(aconn):
nots1 = []
nots2 = []
with pytest.raises(psycopg.OperationalError):
conn.info.error_message
- @pytest.mark.crdb("skip", reason="backend pid")
+ @pytest.mark.crdb_skip("backend pid")
def test_backend_pid(self, conn):
assert conn.info.backend_pid
assert conn.info.backend_pid == conn.pgconn.backend_pid
assert clienc.replace("-", "").replace("_", "").upper() == out
assert conn.info.encoding == codec
- @pytest.mark.crdb("skip", reason="encoding")
+ @pytest.mark.crdb_skip("encoding")
def test_set_encoding_unsupported(self, conn):
cur = conn.cursor()
cur.execute("set client_encoding to EUC_TW")
from .utils import eur, gc_collect
-pytestmark = pytest.mark.crdb("skip", reason="copy")
+pytestmark = pytest.mark.crdb_skip("copy")
sample_records = [(10, 20, "hello"), (40, None, "world")]
sample_values = "values (10::int, 20::int, 'hello'::text), (40, NULL, 'world')"
pytestmark = [
pytest.mark.asyncio,
- pytest.mark.crdb("skip", reason="copy"),
+ pytest.mark.crdb_skip("copy"),
]
assert recs == []
-@pytest.mark.crdb("skip", reason="no col query")
+@pytest.mark.crdb_skip("no col query")
def test_stream_no_col(conn):
cur = conn.cursor()
recs = list(cur.stream("select"))
unpickled = pickle.loads(pickled)
assert [tuple(d) for d in description] == [tuple(d) for d in unpickled]
- @pytest.mark.crdb("skip", reason="no col query")
+ @pytest.mark.crdb_skip("no col query")
def test_no_col_query(self, conn):
cur = conn.execute("select")
assert cur.description == []
assert recs == []
-@pytest.mark.crdb("skip", reason="no col query")
+@pytest.mark.crdb_skip("no col query")
async def test_stream_no_col(aconn):
cur = aconn.cursor()
recs = [rec async for rec in cur.stream("select")]
assert diag.severity_nonlocalized == "ERROR"
-@pytest.mark.crdb("skip", reason="do")
+@pytest.mark.crdb_skip("do")
@pytest.mark.parametrize("enc", ["utf8", "latin9"])
def test_diag_encoding(conn, enc):
msgs = []
assert msgs == [f"hello {eur}"]
-@pytest.mark.crdb("skip", reason="do")
+@pytest.mark.crdb_skip("do")
@pytest.mark.parametrize("enc", ["utf8", "latin9"])
def test_error_encoding(conn, enc):
with conn.transaction():
assert exc2.value.diag.sqlstate == "42P01"
-@pytest.mark.crdb("skip", reason="deferrable")
+@pytest.mark.crdb_skip("deferrable")
def test_diag_from_commit(conn):
cur = conn.cursor()
cur.execute(
@pytest.mark.asyncio
-@pytest.mark.crdb("skip", reason="deferrable")
+@pytest.mark.crdb_skip("deferrable")
async def test_diag_from_commit_async(aconn):
cur = aconn.cursor()
await cur.execute(
assert not s.endswith("\n")
-@pytest.mark.crdb("skip", reason="do")
+@pytest.mark.crdb_skip("do")
def test_unknown_sqlstate(conn):
code = "PXX99"
with pytest.raises(KeyError):
assert cur2.fetchone() == (2,)
-@pytest.mark.crdb("skip", reason="deferrable")
+@pytest.mark.crdb_skip("deferrable")
def test_error_on_commit(conn):
conn.execute(
"""
assert await cur2.fetchone() == (2,)
-@pytest.mark.crdb("skip", reason="deferrable")
+@pytest.mark.crdb_skip("deferrable")
async def test_error_on_commit(aconn):
await aconn.execute(
"""
"query",
[
"create table test_no_prepare ()",
- pytest.param(
- "notify foo, 'bar'", marks=pytest.mark.crdb("skip", reason="notify")
- ),
+ pytest.param("notify foo, 'bar'", marks=pytest.mark.crdb_skip("notify")),
"set timezone = utc",
"select num from prepared_test",
"insert into prepared_test (num) values (1)",
"query",
[
"create table test_no_prepare ()",
- pytest.param(
- "notify foo, 'bar'", marks=pytest.mark.crdb("skip", reason="notify")
- ),
+ pytest.param("notify foo, 'bar'", marks=pytest.mark.crdb_skip("notify")),
"set timezone = utc",
"select num from prepared_test",
"insert into prepared_test (num) values (1)",
cur.fetchone()
-@pytest.mark.crdb("skip", reason="no col query")
+@pytest.mark.crdb_skip("no col query")
@pytest.mark.parametrize(
"factory", "tuple_row dict_row namedtuple_row args_row".split()
)
from psycopg import rows, errors as e
from psycopg.pq import Format
-pytestmark = pytest.mark.crdb("skip", reason="server-side cursor")
+pytestmark = pytest.mark.crdb_skip("server-side cursor")
def test_init_row_factory(conn):
pytestmark = [
pytest.mark.asyncio,
- pytest.mark.crdb("skip", reason="server-side cursor"),
+ pytest.mark.crdb_skip("server-side cursor"),
]
cur.execute("select * from test_compose")
assert cur.fetchall() == [(10, "a", "b", "c"), (20, "d", "e", "f")]
- @pytest.mark.crdb("skip", reason="copy")
+ @pytest.mark.crdb_skip("copy")
def test_copy(self, conn):
cur = conn.cursor()
cur.execute(
conn.adapters.register_dumper(str, StrDumper)
assert sql.Literal("foo").as_string(conn) == "'foo'"
- @pytest.mark.crdb("skip", reason="composite") # create type, actually
+ @pytest.mark.crdb_skip("composite") # create type, actually
@pytest.mark.parametrize("name", ["a-b", f"{eur}", "order", "foo bar"])
def test_invalid_name(self, conn, name):
conn.execute(
import psycopg
from psycopg.pq import TransactionStatus
-pytestmark = pytest.mark.crdb("skip", reason="2-phase commit")
+pytestmark = pytest.mark.crdb_skip("2-phase commit")
def test_tpc_disabled(conn, pipeline):
pytestmark = [
pytest.mark.asyncio,
- pytest.mark.crdb("skip", reason="2-phase commit"),
+ pytest.mark.crdb_skip("2-phase commit"),
]
assert not inserted(conn)
-@pytest.mark.crdb("skip", reason="pg_terminate_backend")
+@pytest.mark.crdb_skip("pg_terminate_backend")
def test_context_inerror_rollback_no_clobber(conn_cls, conn, pipeline, dsn, caplog):
if pipeline:
# Only 'conn' is possibly in pipeline mode, but the transaction and
assert "in rollback" in rec.message
-@pytest.mark.crdb("skip", reason="copy")
+@pytest.mark.crdb_skip("copy")
def test_context_active_rollback_no_clobber(conn_cls, dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
assert not await inserted(aconn)
-@pytest.mark.crdb("skip", reason="pg_terminate_backend")
+@pytest.mark.crdb_skip("pg_terminate_backend")
async def test_context_inerror_rollback_no_clobber(
aconn_cls, aconn, apipeline, dsn, caplog
):
assert "in rollback" in rec.message
-@pytest.mark.crdb("skip", reason="copy")
+@pytest.mark.crdb_skip("copy")
async def test_context_active_rollback_no_clobber(aconn_cls, dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
assert info is None
-@pytest.mark.crdb("skip", reason="composite")
+@pytest.mark.crdb_skip("composite")
@pytest.mark.parametrize(
"name", ["testschema.testtype", sql.Identifier("testschema", "testtype")]
)
assert cur.fetchone()[0]
-@pytest.mark.crdb("skip", reason="nested array")
+@pytest.mark.crdb_skip("nested array")
@pytest.mark.parametrize("fmt_in", PyFormat)
@pytest.mark.parametrize("obj, want", tests_str)
def test_dump_list_str(conn, obj, want, fmt_in):
assert cur.fetchone()[0] == []
-@pytest.mark.crdb("skip", reason="nested array")
+@pytest.mark.crdb_skip("nested array")
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize("want, obj", tests_str)
def test_load_list_str(conn, obj, want, fmt_out):
]
-@pytest.mark.crdb("skip", reason="nested array")
+@pytest.mark.crdb_skip("nested array")
@pytest.mark.parametrize("obj, want", tests_int)
def test_dump_list_int(conn, obj, want):
cur = conn.cursor()
tx.get_dumper(input, PyFormat.BINARY).dump(input)
-@pytest.mark.crdb("skip", reason="nested array")
+@pytest.mark.crdb_skip("nested array")
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize("want, obj", tests_int)
def test_load_list_int(conn, obj, want, fmt_out):
assert got == want
-@pytest.mark.crdb("skip", reason="composite")
+@pytest.mark.crdb_skip("composite")
def test_array_register(conn):
conn.execute("create table mytype (data text)")
cur = conn.execute("""select '(foo)'::mytype, '{"(foo)"}'::mytype[]""")
assert cur.fetchall() == [([1.0],), ([],)]
-@pytest.mark.crdb("skip", reason="geometric types")
+@pytest.mark.crdb_skip("geometric types")
def test_dump_list_no_comma_separator(conn):
class Box:
def __init__(self, x1, y1, x2, y2):
assert got == "{(3,4),(1,2);(5,4),(3,2)}"
-@pytest.mark.crdb("skip", reason="geometric types")
+@pytest.mark.crdb_skip("geometric types")
def test_load_array_no_comma_separator(conn):
cur = conn.execute("select '{(2,2),(1,1);(5,6),(3,4)}'::box[]")
# Not parsed at the moment, but split ok on ; separator
assert cur.fetchone()[0] == ["(2,2),(1,1)", "(5,6),(3,4)"]
-@pytest.mark.crdb("skip", reason="array with bounds")
+@pytest.mark.crdb_skip("nested array")
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize(
"obj, want",
assert got == want
-@pytest.mark.crdb("skip", reason="array with bounds")
+@pytest.mark.crdb_skip("nested array")
@pytest.mark.parametrize("fmt_out", pq.Format)
def test_all_chars_with_bounds(conn, fmt_out):
cur = conn.cursor(binary=fmt_out)
from ..fix_crdb import is_crdb, crdb_skip_message
-pytestmark = pytest.mark.crdb("skip", reason="composite")
+pytestmark = pytest.mark.crdb_skip("composite")
tests_str = [
("", ()),
assert rec[0] is True, type
assert rec[1] == val
- @pytest.mark.crdb("skip", reason="copy")
+ @pytest.mark.crdb_skip("copy")
def test_load_copy(self, conn):
cur = conn.cursor(binary=False)
with cur.copy(
assert rec[0] is True, type
assert rec[1] == val
- @pytest.mark.crdb("skip", reason="copy")
+ @pytest.mark.crdb_skip("copy")
def test_load_copy(self, conn):
cur = conn.cursor(binary=False)
with cur.copy(
with pytest.raises(DataError):
cur.fetchone()[0]
- @pytest.mark.crdb("skip", reason="copy")
+ @pytest.mark.crdb_skip("copy")
def test_load_copy(self, conn):
cur = conn.cursor(binary=False)
with cur.copy(
assert cur.fetchone()[0] == enum[label]
-@pytest.mark.crdb("skip", reason="encoding")
+@pytest.mark.crdb_skip("encoding")
@pytest.mark.parametrize("enum", enum_cases)
@pytest.mark.parametrize("fmt_in", PyFormat)
@pytest.mark.parametrize("fmt_out", pq.Format)
assert cur.fetchone()[0] == item
-@pytest.mark.crdb("skip", reason="encoding")
+@pytest.mark.crdb_skip("encoding")
@pytest.mark.parametrize("enum", enum_cases)
@pytest.mark.parametrize("fmt_in", PyFormat)
@pytest.mark.parametrize("fmt_out", pq.Format)
from psycopg.types import TypeInfo
from psycopg.types.hstore import HstoreLoader, register_hstore
-pytestmark = pytest.mark.crdb("skip", reason="hstore")
+pytestmark = pytest.mark.crdb_skip("hstore")
@pytest.mark.parametrize(
assert cur.fetchone()[0] is True
-@pytest.mark.crdb("skip", reason="json array")
+@pytest.mark.crdb_skip("json array")
@pytest.mark.parametrize("val", samples)
@pytest.mark.parametrize("wrapper", ["Json", "Jsonb"])
@pytest.mark.parametrize("fmt_in", PyFormat)
assert cur.fetchone()[0] == json.loads(val)
-@pytest.mark.crdb("skip", reason="json array")
+@pytest.mark.crdb_skip("json array")
@pytest.mark.parametrize("val", samples)
@pytest.mark.parametrize("jtype", ["json", "jsonb"])
@pytest.mark.parametrize("fmt_out", pq.Format)
assert cur.fetchone()[0] == [json.loads(val)]
-@pytest.mark.crdb("skip", reason="copy")
+@pytest.mark.crdb_skip("copy")
@pytest.mark.parametrize("val", samples)
@pytest.mark.parametrize("jtype", ["json", "jsonb"])
@pytest.mark.parametrize("fmt_out", pq.Format)
pytestmark = [
pytest.mark.pg(">= 14"),
- pytest.mark.crdb("skip", reason="range"),
+ pytest.mark.crdb_skip("range"),
]
from psycopg import sql
from psycopg.adapt import PyFormat
-crdb_skip_cidr = pytest.mark.crdb("skip", reason="cidr")
+crdb_skip_cidr = pytest.mark.crdb_skip("cidr")
@pytest.mark.parametrize("fmt_in", PyFormat)
assert val == got
-@pytest.mark.crdb("skip", reason="copy")
+@pytest.mark.crdb_skip("copy")
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize("val", ["127.0.0.1/32", "::ffff:102:300/128"])
def test_inet_load_address(conn, fmt_out, val):
assert got == addr
-@pytest.mark.crdb("skip", reason="copy")
+@pytest.mark.crdb_skip("copy")
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize("val", ["127.0.0.1/24", "::ffff:102:300/127"])
def test_inet_load_network(conn, fmt_out, val):
assert result == pytest.approx(want)
-@pytest.mark.crdb("skip", reason="copy")
+@pytest.mark.crdb_skip("copy")
def test_load_float_copy(conn):
cur = conn.cursor(binary=False)
with cur.copy("copy (select 3.14::float8, 'hi'::text) to stdout;") as copy:
assert r == (val, -val)
-@pytest.mark.crdb("skip", reason="numeric precision not maintained? TODOCRDB")
+@pytest.mark.crdb_skip("binary decimal")
@pytest.mark.parametrize(
"expr",
["NaN", "1", "1.0", "-1", "0.0", "0.01", "11", "1.1", "1.01", "0", "0.00"]
[
f
if f != PyFormat.BINARY
- else pytest.param(f, marks=pytest.mark.crdb("skip", reason="binary decimal"))
+ else pytest.param(f, marks=pytest.mark.crdb_skip("binary decimal"))
for f in PyFormat
],
)
from ..utils import eur
from ..fix_crdb import is_crdb, crdb_skip_message
-pytestmark = pytest.mark.crdb("skip", reason="range")
+pytestmark = pytest.mark.crdb_skip("range")
type2sub = {
"int4range": "int4",
assert res == ord(eur)
-@pytest.mark.crdb("skip", reason="encoding")
+@pytest.mark.crdb_skip("encoding")
@pytest.mark.parametrize("fmt_in", PyFormat)
def test_dump_badenc(conn, fmt_in):
cur = conn.cursor()
assert cur.fetchone()[0] == "foobar"
-@pytest.mark.crdb("skip", reason="copy")
+@pytest.mark.crdb_skip("copy")
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize("encoding", ["utf8", crdb_encoding("latin9")])
@pytest.mark.parametrize("typename", ["text", "varchar", "name", "bpchar"])
assert res == eur
-@pytest.mark.crdb("skip", reason="encoding")
+@pytest.mark.crdb_skip("encoding")
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize("typename", ["text", "varchar", "name", "bpchar"])
def test_load_badenc(conn, typename, fmt_out):
copy.read_row()
-@pytest.mark.crdb("skip", reason="encoding")
+@pytest.mark.crdb_skip("encoding")
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize("typename", ["text", "varchar", "name", "bpchar"])
def test_load_ascii(conn, typename, fmt_out):
assert res == a
-@pytest.mark.crdb("skip", reason="encoding")
+@pytest.mark.crdb_skip("encoding")
@pytest.mark.parametrize("fmt_in", PyFormat)
@pytest.mark.parametrize("fmt_out", pq.Format)
def test_text_array_ascii(conn, fmt_in, fmt_out):
assert cur.fetchone()[0] is True
-@pytest.mark.crdb("skip", reason="copy")
+@pytest.mark.crdb_skip("copy")
@pytest.mark.parametrize("fmt_out", pq.Format)
def test_uuid_load(conn, fmt_out):
cur = conn.cursor(binary=fmt_out)