From: Daniele Varrazzo Date: Fri, 11 Jul 2025 23:13:32 +0000 (+0200) Subject: fix(c): avoid a TypeError shadowing CancelledError on task cancellation X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=18459817734c33a44430d5f6486b5dd8db5811de;p=thirdparty%2Fpsycopg.git fix(c): avoid a TypeError shadowing CancelledError on task cancellation Fix #1005. --- diff --git a/docs/news.rst b/docs/news.rst index de486cdd0..72c71ac01 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -10,6 +10,8 @@ Psycopg 3.2.10 (unreleased) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ +- Fix `!TypeError` shadowing `~asyncio.CancelledError` upon task cancellation + during pipeline execution (:ticket:`#1005`). - Fix memory leak when lambda/local functions are used as argument for `~.psycopg.types.json.set_json_dumps()`, `~.psycopg.types.json.set_json_loads()` (:ticket:`#1108`). diff --git a/psycopg_c/psycopg_c/_psycopg/generators.pyx b/psycopg_c/psycopg_c/_psycopg/generators.pyx index f85c821c4..47f4a2f83 100644 --- a/psycopg_c/psycopg_c/_psycopg/generators.pyx +++ b/psycopg_c/psycopg_c/_psycopg/generators.pyx @@ -270,7 +270,7 @@ def pipeline_communicate( cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr cdef int cires cdef int status - cdef int ready + cdef int cready cdef libpq.PGresult *pgres cdef list res = [] cdef list results = [] @@ -278,11 +278,17 @@ def pipeline_communicate( while True: while True: + # I don't quite get why, but we can receive a None upon async + # task cancellation. See #1005. ready = yield WAIT_RW - if ready: + if ready is None: + continue + + cready = ready + if cready: break - if ready & READY_R: + if cready & READY_R: with nogil: cires = libpq.PQconsumeInput(pgconn_ptr) if 1 != cires: @@ -325,7 +331,7 @@ def pipeline_communicate( else: res.append(r) - if ready & READY_W: + if cready & READY_W: pgconn.flush() if not commands: break diff --git a/tests/test_concurrency_async.py b/tests/test_concurrency_async.py index c69954b13..8f2123745 100644 --- a/tests/test_concurrency_async.py +++ b/tests/test_concurrency_async.py @@ -402,3 +402,72 @@ async def test_transaction_concurrency(aconn, what): await asyncio.gather(t1) evs[2].set() await asyncio.gather(t2) + + +@pytest.mark.slow +def test_type_error_shadow(dsn): + script = f"""\ +import sys +import uuid +import asyncio + +import psycopg + +DSN = {dsn!r} +excs = [] + + +async def connect() -> psycopg.AsyncConnection: + return await psycopg.AsyncConnection.connect(DSN, autocommit=True) + + +async def worker(task_index: int, conn: psycopg.AsyncConnection) -> None: + try: + async with conn.cursor() as cur: + await cur.executemany( + "INSERT INTO lotsa_rows_table (id, task_index) VALUES (%s, %s)", + [(uuid.uuid4(), task_index) for i in range(50000)], + ) + except BaseException as be: + excs.append(be) + raise + + +async def crashing() -> None: + 1 / 0 + + +async def main() -> None: + async with await connect() as conn: + await conn.execute("DROP TABLE IF EXISTS lotsa_rows_table") + await conn.execute( + "CREATE TABLE lotsa_rows_table (id uuid PRIMARY KEY, task_index INT)" + ) + + # create three tasks that each insert a large number of rows + conns = [await connect() for i in range(3)] + tasks = [asyncio.create_task(worker(i, conn=conns[i])) for i in range(3)] + + # create one more task that fails immediately + tasks.append(asyncio.create_task(crashing())) + + try: + await asyncio.gather(*tasks) + except ZeroDivisionError: + pass + else: + raise AssertionError("ValueError not raised") + + +if __name__ == "__main__": + if sys.platform == "win32": + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + try: + asyncio.run(main()) + finally: + assert excs, "No exception raised by workers" + for ex in excs: + assert isinstance(ex, asyncio.CancelledError), f"raised {{ex!r}} instead" +""" + + sp.run([sys.executable, "-s"], input=script, text=True, check=True)