]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix(c): avoid a TypeError shadowing CancelledError on task cancellation 1119/head
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 11 Jul 2025 23:13:32 +0000 (01:13 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 12 Jul 2025 00:26:25 +0000 (02:26 +0200)
Fix #1005.

docs/news.rst
psycopg_c/psycopg_c/_psycopg/generators.pyx
tests/test_concurrency_async.py

index de486cdd0a17740db03f23f5ed30d0540cc9f93a..72c71ac012a5bdb4493b8c59d95dbcb4d7f98cfb 100644 (file)
@@ -10,6 +10,8 @@
 Psycopg 3.2.10 (unreleased)
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
+- Fix `!TypeError` shadowing `~asyncio.CancelledError` upon task cancellation
+  during pipeline execution (:ticket:`#1005`).
 - Fix memory leak when lambda/local functions are used as argument for
   `~.psycopg.types.json.set_json_dumps()`, `~.psycopg.types.json.set_json_loads()`
   (:ticket:`#1108`).
index f85c821c46c6b5902420edbf5a3d0e3a15058348..47f4a2f8336d7196305b0470d90bc18b372413f8 100644 (file)
@@ -270,7 +270,7 @@ def pipeline_communicate(
     cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr
     cdef int cires
     cdef int status
-    cdef int ready
+    cdef int cready
     cdef libpq.PGresult *pgres
     cdef list res = []
     cdef list results = []
@@ -278,11 +278,17 @@ def pipeline_communicate(
 
     while True:
         while True:
+            # I don't quite get why, but we can receive a None upon async
+            # task cancellation. See #1005.
             ready = yield WAIT_RW
-            if ready:
+            if ready is None:
+                continue
+
+            cready = ready
+            if cready:
                 break
 
-        if ready & READY_R:
+        if cready & READY_R:
             with nogil:
                 cires = libpq.PQconsumeInput(pgconn_ptr)
             if 1 != cires:
@@ -325,7 +331,7 @@ def pipeline_communicate(
                     else:
                         res.append(r)
 
-        if ready & READY_W:
+        if cready & READY_W:
             pgconn.flush()
             if not commands:
                 break
index c69954b13b23b7085a85c550eb04cda4610ba56c..8f2123745f52380ca3a23979bbbea6851063c0ff 100644 (file)
@@ -402,3 +402,72 @@ async def test_transaction_concurrency(aconn, what):
     await asyncio.gather(t1)
     evs[2].set()
     await asyncio.gather(t2)
+
+
+@pytest.mark.slow
+def test_type_error_shadow(dsn):
+    script = f"""\
+import sys
+import uuid
+import asyncio
+
+import psycopg
+
+DSN = {dsn!r}
+excs = []
+
+
+async def connect() -> psycopg.AsyncConnection:
+    return await psycopg.AsyncConnection.connect(DSN, autocommit=True)
+
+
+async def worker(task_index: int, conn: psycopg.AsyncConnection) -> None:
+    try:
+        async with conn.cursor() as cur:
+            await cur.executemany(
+                "INSERT INTO lotsa_rows_table (id, task_index) VALUES (%s, %s)",
+                [(uuid.uuid4(), task_index) for i in range(50000)],
+            )
+    except BaseException as be:
+        excs.append(be)
+        raise
+
+
+async def crashing() -> None:
+    1 / 0
+
+
+async def main() -> None:
+    async with await connect() as conn:
+        await conn.execute("DROP TABLE IF EXISTS lotsa_rows_table")
+        await conn.execute(
+            "CREATE TABLE lotsa_rows_table (id uuid PRIMARY KEY, task_index INT)"
+        )
+
+    # create three tasks that each insert a large number of rows
+    conns = [await connect() for i in range(3)]
+    tasks = [asyncio.create_task(worker(i, conn=conns[i])) for i in range(3)]
+
+    # create one more task that fails immediately
+    tasks.append(asyncio.create_task(crashing()))
+
+    try:
+        await asyncio.gather(*tasks)
+    except ZeroDivisionError:
+        pass
+    else:
+        raise AssertionError("ValueError not raised")
+
+
+if __name__ == "__main__":
+    if sys.platform == "win32":
+        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
+    try:
+        asyncio.run(main())
+    finally:
+        assert excs, "No exception raised by workers"
+        for ex in excs:
+            assert isinstance(ex, asyncio.CancelledError), f"raised {{ex!r}} instead"
+"""
+
+    sp.run([sys.executable, "-s"], input=script, text=True, check=True)