@contextlib.contextmanager
-def cancellable_query(pgconn: PGconn, monitor_conn: PGconn) -> Iterator[None]:
+def cancellable_query(pgconn: PGconn) -> Iterator[None]:
+ dsn = b" ".join(b"%s='%s'" % (i.keyword, i.val) for i in pgconn.info if i.val)
+ monitor_conn = pq.PGconn.connect(dsn)
+ assert (
+ monitor_conn.status == pq.ConnStatus.OK
+ ), f"bad connection: {monitor_conn.error_message.decode('utf8', 'replace')}"
+
pgconn.send_query_params(b"SELECT pg_sleep($1)", [b"180"])
+
while True:
r = monitor_conn.exec_(
b"SELECT count(*) FROM pg_stat_activity"
)
assert r.status == pq.ExecStatus.TUPLES_OK
if r.get_value(0, 0) != b"0":
+ del monitor_conn
break
time.sleep(0.01)
@pytest.mark.libpq(">= 17")
-def test_cancel_nonblocking(dsn):
- # mimic test_cancel() from src/test/modules/libpq_pipeline/libpq_pipeline.c
- def connect() -> PGconn:
- conn = pq.PGconn.connect(dsn.encode())
- if conn.status != pq.ConnStatus.OK:
- pytest.fail(
- f"bad connection: {conn.error_message.decode('utf8', 'replace')}"
- )
- return conn
-
- conn, monitor_conn = connect(), connect()
- conn.nonblocking = 1
+def test_cancel_conn_blocking(pgconn):
+ # test PQcancelBlocking, similarly to test_cancel() from
+ # src/test/modules/libpq_pipeline/libpq_pipeline.c
+ pgconn.nonblocking = 1
- # test PQcancel
- with cancellable_query(conn, monitor_conn):
- cancel = conn.get_cancel()
- cancel.cancel()
+ with cancellable_query(pgconn):
+ cancel_conn = pgconn.cancel_conn()
+ assert cancel_conn.status == pq.ConnStatus.ALLOCATED
+ cancel_conn.blocking()
+ assert cancel_conn.status == pq.ConnStatus.OK
- # PGcancel object can be reused for the next query
- with cancellable_query(conn, monitor_conn):
- cancel.cancel()
+ # test PQcancelReset works on the cancel connection and it can be reused
+ # after
+ cancel_conn.reset()
+ with cancellable_query(pgconn):
+ cancel_conn.blocking()
+ assert cancel_conn.status == pq.ConnStatus.OK
- del cancel
- # test PQcancelBlocking
- with cancellable_query(conn, monitor_conn):
- cancel_conn = conn.cancel_conn()
- assert cancel_conn.status == pq.ConnStatus.ALLOCATED
- cancel_conn.blocking()
- assert cancel_conn.status == pq.ConnStatus.OK # type: ignore[comparison-overlap] # noqa: E501
- cancel_conn.finish()
- del cancel_conn
+@pytest.mark.libpq(">= 17")
+def test_cancel_conn_nonblocking(pgconn):
+ # test PQcancelStart() and then polling with PQcancelPoll, similarly to
+ # test_cancel() from src/test/modules/libpq_pipeline/libpq_pipeline.c
+ pgconn.nonblocking = 1
wait_cancel = partial(wait, poll_method="poll", timeout=3)
# test PQcancelCreate and then polling with PQcancelPoll
- with cancellable_query(conn, monitor_conn):
- cancel_conn = conn.cancel_conn()
+ with cancellable_query(pgconn):
+ cancel_conn = pgconn.cancel_conn()
assert cancel_conn.status == pq.ConnStatus.ALLOCATED
cancel_conn.start()
assert cancel_conn.status == pq.ConnStatus.STARTED
# test PQcancelReset works on the cancel connection and it can be reused
# after
cancel_conn.reset()
- with cancellable_query(conn, monitor_conn):
+ with cancellable_query(pgconn):
cancel_conn.start()
wait_cancel(cancel_conn)
assert cancel_conn.status == pq.ConnStatus.OK
- cancel_conn.finish()
-
def test_cancel(pgconn):
cancel = pgconn.get_cancel()