From: Daniele Varrazzo Date: Sun, 9 Apr 2023 18:29:44 +0000 (+0200) Subject: fix(async): cancel query upon receiving CanceledError in async wait X-Git-Tag: 3.1.9~8 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=eb9e1ad07af3501122e84e56f5c8822dc3f5f193;p=thirdparty%2Fpsycopg.git fix(async): cancel query upon receiving CanceledError in async wait This interrupt a running query upon Ctrl-C for example, which wasn't working as it was on sync connection. Close #543. --- diff --git a/docs/news.rst b/docs/news.rst index f0f547f3e..deff90a0f 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -15,6 +15,8 @@ Psycopg 3.1.9 (unreleased) - Fix `TypeInfo.fetch()` using a connection in `!sql_ascii` encoding (:ticket:`#503`). +- Fix canceling running queries on process interruption in async connections + (:ticket:`#543`). Current release diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index 6d95d7641..f6842c6ed 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -346,13 +346,9 @@ class AsyncConnection(BaseConnection[Row]): async def wait(self, gen: PQGen[RV]) -> RV: try: return await waiting.wait_async(gen, self.pgconn.socket) - except KeyboardInterrupt: - # TODO: this doesn't seem to work as it does for sync connections - # see tests/test_concurrency_async.py::test_ctrl_c - # In the test, the code doesn't reach this branch. - + except (asyncio.CancelledError, KeyboardInterrupt): # On Ctrl-C, try to cancel the query in the server, otherwise - # otherwise the connection will be stuck in ACTIVE state + # the connection will remain stuck in ACTIVE state. c = self.pgconn.get_cancel() c.cancel() try: diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py index eec24f1df..7a5119c88 100644 --- a/tests/test_concurrency.py +++ b/tests/test_concurrency.py @@ -242,7 +242,7 @@ def test_identify_closure(conn_cls, dsn): sys.platform == "win32", reason="don't know how to Ctrl-C on Windows" ) @pytest.mark.crdb_skip("cancel") -def test_ctrl_c(dsn): +def test_ctrl_c_handler(dsn): if sys.platform == "win32": sig = int(signal.CTRL_C_EVENT) # Or pytest will receive the Ctrl-C too @@ -293,6 +293,72 @@ with psycopg.connect({dsn!r}) as conn: assert 1 < t < 2 +@pytest.mark.slow +@pytest.mark.subprocess +@pytest.mark.skipif( + sys.platform == "win32", reason="don't know how to Ctrl-C on Windows" +) +@pytest.mark.crdb("skip") +def test_ctrl_c(conn, dsn): + conn.autocommit = True + + APPNAME = "test_ctrl_c" + script = f"""\ +import psycopg + +with psycopg.connect({dsn!r}, application_name={APPNAME!r}) as conn: + conn.execute("select pg_sleep(60)") +""" + + if sys.platform == "win32": + creationflags = sp.CREATE_NEW_PROCESS_GROUP + sig = signal.CTRL_C_EVENT + else: + creationflags = 0 + sig = signal.SIGINT + + proc = None + + def run_process(): + nonlocal proc + proc = sp.Popen( + [sys.executable, "-s", "-c", script], + creationflags=creationflags, + ) + proc.communicate() + + t = threading.Thread(target=run_process) + t.start() + + for i in range(20): + cur = conn.execute( + "select pid from pg_stat_activity where application_name = %s", (APPNAME,) + ) + rec = cur.fetchone() + if rec: + pid = rec[0] + break + time.sleep(0.1) + else: + assert False, "process didn't start?" + + t0 = time.time() + assert proc + proc.send_signal(sig) + proc.wait() + + for i in range(20): + cur = conn.execute("select 1 from pg_stat_activity where pid = %s", (pid,)) + if not cur.fetchone(): + break + time.sleep(0.1) + else: + assert False, "process didn't stop?" + + t1 = time.time() + assert t1 - t0 < 1.0 + + @pytest.mark.slow @pytest.mark.subprocess @pytest.mark.skipif( diff --git a/tests/test_concurrency_async.py b/tests/test_concurrency_async.py index 67bb6afbd..80d3fc5cc 100644 --- a/tests/test_concurrency_async.py +++ b/tests/test_concurrency_async.py @@ -2,6 +2,7 @@ import sys import time import signal import asyncio +import threading import subprocess as sp from asyncio.queues import Queue from typing import List, Tuple @@ -192,7 +193,7 @@ async def test_identify_closure(aconn_cls, dsn): sys.platform == "win32", reason="don't know how to Ctrl-C on Windows" ) @pytest.mark.crdb_skip("cancel") -def test_ctrl_c(dsn): +def test_ctrl_c_handler(dsn): script = f"""\ import signal import asyncio @@ -238,3 +239,76 @@ asyncio.run(main()) proc.send_signal(sig) proc.communicate() assert proc.returncode == 0 + + +@pytest.mark.slow +@pytest.mark.subprocess +@pytest.mark.skipif( + sys.platform == "win32", reason="don't know how to Ctrl-C on Windows" +) +@pytest.mark.crdb("skip") +def test_ctrl_c(conn, dsn): + # https://github.com/psycopg/psycopg/issues/543 + conn.autocommit = True + + APPNAME = "test_ctrl_c" + script = f"""\ +import asyncio +import psycopg + +async def main(): + async with await psycopg.AsyncConnection.connect( + {dsn!r}, application_name={APPNAME!r} + ) as conn: + await conn.execute("select pg_sleep(5)") + +asyncio.run(main()) +""" + if sys.platform == "win32": + creationflags = sp.CREATE_NEW_PROCESS_GROUP + sig = signal.CTRL_C_EVENT + else: + creationflags = 0 + sig = signal.SIGINT + + proc = None + + def run_process(): + nonlocal proc + proc = sp.Popen( + [sys.executable, "-s", "-c", script], + creationflags=creationflags, + stderr=sp.PIPE, + ) + proc.communicate() + + t = threading.Thread(target=run_process) + t.start() + + for i in range(20): + cur = conn.execute( + "select pid from pg_stat_activity where application_name = %s", (APPNAME,) + ) + rec = cur.fetchone() + if rec: + pid = rec[0] + break + time.sleep(0.1) + else: + assert False, "process didn't start?" + + t0 = time.time() + assert proc + proc.send_signal(sig) + proc.wait() + + for i in range(20): + cur = conn.execute("select 1 from pg_stat_activity where pid = %s", (pid,)) + if not cur.fetchone(): + break + time.sleep(0.1) + else: + assert False, "process didn't stop?" + + t1 = time.time() + assert t1 - t0 < 1.0