From 118069cf9f8e1c588b58795268ee6c25c722285a Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sat, 12 Jul 2025 01:13:32 +0200 Subject: [PATCH] fix(c): avoid a TypeError shadowing CancelledError on task cancellation Fix #1005. --- docs/news.rst | 2 + psycopg_c/psycopg_c/_psycopg/generators.pyx | 14 +++-- tests/test_concurrency_async.py | 69 +++++++++++++++++++++ 3 files changed, 81 insertions(+), 4 deletions(-) diff --git a/docs/news.rst b/docs/news.rst index 636b1f62d..121d3b9ed 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -21,6 +21,8 @@ Psycopg 3.3.0 (unreleased) Psycopg 3.2.10 (unreleased) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ +- Fix `!TypeError` shadowing `~asyncio.CancelledError` upon task cancellation + during pipeline execution (:ticket:`#1005`). - Fix memory leak when lambda/local functions are used as argument for `~.psycopg.types.json.set_json_dumps()`, `~.psycopg.types.json.set_json_loads()` (:ticket:`#1108`). diff --git a/psycopg_c/psycopg_c/_psycopg/generators.pyx b/psycopg_c/psycopg_c/_psycopg/generators.pyx index be45fe17f..8c58488ad 100644 --- a/psycopg_c/psycopg_c/_psycopg/generators.pyx +++ b/psycopg_c/psycopg_c/_psycopg/generators.pyx @@ -270,7 +270,7 @@ def pipeline_communicate( cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr cdef int cires cdef int status - cdef int ready + cdef int cready cdef libpq.PGresult *pgres cdef list res = [] cdef list results = [] @@ -278,11 +278,17 @@ def pipeline_communicate( while True: while True: + # I don't quite get why, but we can receive a None upon async + # task cancellation. See #1005. ready = yield WAIT_RW - if ready: + if ready is None: + continue + + cready = ready + if cready: break - if ready & READY_R: + if cready & READY_R: with nogil: cires = libpq.PQconsumeInput(pgconn_ptr) if 1 != cires: @@ -325,7 +331,7 @@ def pipeline_communicate( else: res.append(r) - if ready & READY_W: + if cready & READY_W: pgconn.flush() if not commands: break diff --git a/tests/test_concurrency_async.py b/tests/test_concurrency_async.py index c69954b13..8f2123745 100644 --- a/tests/test_concurrency_async.py +++ b/tests/test_concurrency_async.py @@ -402,3 +402,72 @@ async def test_transaction_concurrency(aconn, what): await asyncio.gather(t1) evs[2].set() await asyncio.gather(t2) + + +@pytest.mark.slow +def test_type_error_shadow(dsn): + script = f"""\ +import sys +import uuid +import asyncio + +import psycopg + +DSN = {dsn!r} +excs = [] + + +async def connect() -> psycopg.AsyncConnection: + return await psycopg.AsyncConnection.connect(DSN, autocommit=True) + + +async def worker(task_index: int, conn: psycopg.AsyncConnection) -> None: + try: + async with conn.cursor() as cur: + await cur.executemany( + "INSERT INTO lotsa_rows_table (id, task_index) VALUES (%s, %s)", + [(uuid.uuid4(), task_index) for i in range(50000)], + ) + except BaseException as be: + excs.append(be) + raise + + +async def crashing() -> None: + 1 / 0 + + +async def main() -> None: + async with await connect() as conn: + await conn.execute("DROP TABLE IF EXISTS lotsa_rows_table") + await conn.execute( + "CREATE TABLE lotsa_rows_table (id uuid PRIMARY KEY, task_index INT)" + ) + + # create three tasks that each insert a large number of rows + conns = [await connect() for i in range(3)] + tasks = [asyncio.create_task(worker(i, conn=conns[i])) for i in range(3)] + + # create one more task that fails immediately + tasks.append(asyncio.create_task(crashing())) + + try: + await asyncio.gather(*tasks) + except ZeroDivisionError: + pass + else: + raise AssertionError("ValueError not raised") + + +if __name__ == "__main__": + if sys.platform == "win32": + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + try: + asyncio.run(main()) + finally: + assert excs, "No exception raised by workers" + for ex in excs: + assert isinstance(ex, asyncio.CancelledError), f"raised {{ex!r}} instead" +""" + + sp.run([sys.executable, "-s"], input=script, text=True, check=True) -- 2.47.2