Psycopg 3.2.10 (unreleased)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
+- Fix `!TypeError` shadowing `~asyncio.CancelledError` upon task cancellation
+ during pipeline execution (:ticket:`#1005`).
- Fix memory leak when lambda/local functions are used as argument for
`~.psycopg.types.json.set_json_dumps()`, `~.psycopg.types.json.set_json_loads()`
(:ticket:`#1108`).
await asyncio.gather(t1)
evs[2].set()
await asyncio.gather(t2)
+
+
+@pytest.mark.slow
+def test_type_error_shadow(dsn):
+ script = f"""\
+import sys
+import uuid
+import asyncio
+
+import psycopg
+
+DSN = {dsn!r}
+excs = []
+
+
+async def connect() -> psycopg.AsyncConnection:
+ return await psycopg.AsyncConnection.connect(DSN, autocommit=True)
+
+
+async def worker(task_index: int, conn: psycopg.AsyncConnection) -> None:
+ try:
+ async with conn.cursor() as cur:
+ await cur.executemany(
+ "INSERT INTO lotsa_rows_table (id, task_index) VALUES (%s, %s)",
+ [(uuid.uuid4(), task_index) for i in range(50000)],
+ )
+ except BaseException as be:
+ excs.append(be)
+ raise
+
+
+async def crashing() -> None:
+ 1 / 0
+
+
+async def main() -> None:
+ async with await connect() as conn:
+ await conn.execute("DROP TABLE IF EXISTS lotsa_rows_table")
+ await conn.execute(
+ "CREATE TABLE lotsa_rows_table (id uuid PRIMARY KEY, task_index INT)"
+ )
+
+ # create three tasks that each insert a large number of rows
+ conns = [await connect() for i in range(3)]
+ tasks = [asyncio.create_task(worker(i, conn=conns[i])) for i in range(3)]
+
+ # create one more task that fails immediately
+ tasks.append(asyncio.create_task(crashing()))
+
+ try:
+ await asyncio.gather(*tasks)
+ except ZeroDivisionError:
+ pass
+ else:
+ raise AssertionError("ValueError not raised")
+
+
+if __name__ == "__main__":
+ if sys.platform == "win32":
+ asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
+ try:
+ asyncio.run(main())
+ finally:
+ assert excs, "No exception raised by workers"
+ for ex in excs:
+ assert isinstance(ex, asyncio.CancelledError), f"raised {{ex!r}} instead"
+"""
+
+ sp.run([sys.executable, "-s"], input=script, text=True, check=True)