- Fix `TypeInfo.fetch()` using a connection in `!sql_ascii` encoding
(:ticket:`#503`).
+- Fix canceling running queries on process interruption in async connections
+ (:ticket:`#543`).
Current release
async def wait(self, gen: PQGen[RV]) -> RV:
try:
return await waiting.wait_async(gen, self.pgconn.socket)
- except KeyboardInterrupt:
- # TODO: this doesn't seem to work as it does for sync connections
- # see tests/test_concurrency_async.py::test_ctrl_c
- # In the test, the code doesn't reach this branch.
-
+ except (asyncio.CancelledError, KeyboardInterrupt):
# On Ctrl-C, try to cancel the query in the server, otherwise
- # otherwise the connection will be stuck in ACTIVE state
+ # the connection will remain stuck in ACTIVE state.
c = self.pgconn.get_cancel()
c.cancel()
try:
sys.platform == "win32", reason="don't know how to Ctrl-C on Windows"
)
@pytest.mark.crdb_skip("cancel")
-def test_ctrl_c(dsn):
+def test_ctrl_c_handler(dsn):
if sys.platform == "win32":
sig = int(signal.CTRL_C_EVENT)
# Or pytest will receive the Ctrl-C too
assert 1 < t < 2
+@pytest.mark.slow
+@pytest.mark.subprocess
+@pytest.mark.skipif(
+ sys.platform == "win32", reason="don't know how to Ctrl-C on Windows"
+)
+@pytest.mark.crdb("skip")
+def test_ctrl_c(conn, dsn):
+ conn.autocommit = True
+
+ APPNAME = "test_ctrl_c"
+ script = f"""\
+import psycopg
+
+with psycopg.connect({dsn!r}, application_name={APPNAME!r}) as conn:
+ conn.execute("select pg_sleep(60)")
+"""
+
+ if sys.platform == "win32":
+ creationflags = sp.CREATE_NEW_PROCESS_GROUP
+ sig = signal.CTRL_C_EVENT
+ else:
+ creationflags = 0
+ sig = signal.SIGINT
+
+ proc = None
+
+ def run_process():
+ nonlocal proc
+ proc = sp.Popen(
+ [sys.executable, "-s", "-c", script],
+ creationflags=creationflags,
+ )
+ proc.communicate()
+
+ t = threading.Thread(target=run_process)
+ t.start()
+
+ for i in range(20):
+ cur = conn.execute(
+ "select pid from pg_stat_activity where application_name = %s", (APPNAME,)
+ )
+ rec = cur.fetchone()
+ if rec:
+ pid = rec[0]
+ break
+ time.sleep(0.1)
+ else:
+ assert False, "process didn't start?"
+
+ t0 = time.time()
+ assert proc
+ proc.send_signal(sig)
+ proc.wait()
+
+ for i in range(20):
+ cur = conn.execute("select 1 from pg_stat_activity where pid = %s", (pid,))
+ if not cur.fetchone():
+ break
+ time.sleep(0.1)
+ else:
+ assert False, "process didn't stop?"
+
+ t1 = time.time()
+ assert t1 - t0 < 1.0
+
+
@pytest.mark.slow
@pytest.mark.subprocess
@pytest.mark.skipif(
import time
import signal
import asyncio
+import threading
import subprocess as sp
from asyncio.queues import Queue
from typing import List, Tuple
sys.platform == "win32", reason="don't know how to Ctrl-C on Windows"
)
@pytest.mark.crdb_skip("cancel")
-def test_ctrl_c(dsn):
+def test_ctrl_c_handler(dsn):
script = f"""\
import signal
import asyncio
proc.send_signal(sig)
proc.communicate()
assert proc.returncode == 0
+
+
+@pytest.mark.slow
+@pytest.mark.subprocess
+@pytest.mark.skipif(
+ sys.platform == "win32", reason="don't know how to Ctrl-C on Windows"
+)
+@pytest.mark.crdb("skip")
+def test_ctrl_c(conn, dsn):
+ # https://github.com/psycopg/psycopg/issues/543
+ conn.autocommit = True
+
+ APPNAME = "test_ctrl_c"
+ script = f"""\
+import asyncio
+import psycopg
+
+async def main():
+ async with await psycopg.AsyncConnection.connect(
+ {dsn!r}, application_name={APPNAME!r}
+ ) as conn:
+ await conn.execute("select pg_sleep(5)")
+
+asyncio.run(main())
+"""
+ if sys.platform == "win32":
+ creationflags = sp.CREATE_NEW_PROCESS_GROUP
+ sig = signal.CTRL_C_EVENT
+ else:
+ creationflags = 0
+ sig = signal.SIGINT
+
+ proc = None
+
+ def run_process():
+ nonlocal proc
+ proc = sp.Popen(
+ [sys.executable, "-s", "-c", script],
+ creationflags=creationflags,
+ stderr=sp.PIPE,
+ )
+ proc.communicate()
+
+ t = threading.Thread(target=run_process)
+ t.start()
+
+ for i in range(20):
+ cur = conn.execute(
+ "select pid from pg_stat_activity where application_name = %s", (APPNAME,)
+ )
+ rec = cur.fetchone()
+ if rec:
+ pid = rec[0]
+ break
+ time.sleep(0.1)
+ else:
+ assert False, "process didn't start?"
+
+ t0 = time.time()
+ assert proc
+ proc.send_signal(sig)
+ proc.wait()
+
+ for i in range(20):
+ cur = conn.execute("select 1 from pg_stat_activity where pid = %s", (pid,))
+ if not cur.fetchone():
+ break
+ time.sleep(0.1)
+ else:
+ assert False, "process didn't stop?"
+
+ t1 = time.time()
+ assert t1 - t0 < 1.0