]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: Cancel query on Ctrl-C
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 22 Feb 2022 03:02:13 +0000 (04:02 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 2 Mar 2022 02:21:35 +0000 (02:21 +0000)
On KeyboardInterrupt, send a cancel to the server and keep waiting for
the result of the cancel, which is expected to raise a QueryCanceled,
then re-raise KeyboardInterrupt.

Before this, the connection was left in ACTIVE state, so it couldn't be rolled
back.

Only fixed on sync connections. Left a failing test for async
connections; the test fails with an output from the script such as:

    error ignored in rollback on <psycopg.AsyncConnection [ACTIVE] ...>:
    sending query failed: another command is already in progress
    Traceback (most recent call last):
      File "<string>", line 27, in <module>
      File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
        return loop.run_until_complete(main)
      File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
        self.run_forever()
      File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
        self._run_once()
      File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
        event_list = self._selector.select(timeout)
      File "/usr/lib/python3.8/selectors.py", line 468, in select
        fd_event_list = self._selector.poll(timeout, max_ev)
    KeyboardInterrupt

And the except branch in `AsyncConnection.wait()` is not reached.

See #231

docs/news.rst
psycopg/psycopg/connection.py
psycopg/psycopg/connection_async.py
tests/test_concurrency.py
tests/test_concurrency_async.py

index 9388119d152f196c74b2e5de62705221ee4ff58a..f96a547525849cf3568bef479f734b7a1086aac6 100644 (file)
@@ -21,6 +21,13 @@ Psycopg 3.1 (unreleased)
 - Drop support for Python 3.6.
 
 
+Psycopg 3.0.10 (unreleased)
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- Leave the connection working after interrupting a query with Ctrl-C
+  (currently only for sync connections, :ticket:`#231`).
+
+
 Current release
 ---------------
 
index 0ca56d10475fb2a176218f5e02bd6d234d3415e0..37a606eb7e6463de4dce035ae357ec327a2cf18c 100644 (file)
@@ -857,7 +857,18 @@ class Connection(BaseConnection[Row]):
         The function must be used on generators that don't change connection
         fd (i.e. not on connect and reset).
         """
-        return waiting.wait(gen, self.pgconn.socket, timeout=timeout)
+        try:
+            return waiting.wait(gen, self.pgconn.socket, timeout=timeout)
+        except KeyboardInterrupt:
+            # On Ctrl-C, try to cancel the query in the server, otherwise
+            # otherwise the connection will be stuck in ACTIVE state
+            c = self.pgconn.get_cancel()
+            c.cancel()
+            try:
+                waiting.wait(gen, self.pgconn.socket, timeout=timeout)
+            except e.QueryCanceled:
+                pass  # as expected
+            raise
 
     @classmethod
     def _wait_conn(cls, gen: PQGenConn[RV], timeout: Optional[int]) -> RV:
index 8bd2ca73f6069c0cfb4c183c496f9301b5700436..e344778654c2a671f63a036dd90c3e11d0b600b3 100644 (file)
@@ -293,7 +293,22 @@ class AsyncConnection(BaseConnection[Row]):
                 yield n
 
     async def wait(self, gen: PQGen[RV]) -> RV:
-        return await waiting.wait_async(gen, self.pgconn.socket)
+        try:
+            return await waiting.wait_async(gen, self.pgconn.socket)
+        except KeyboardInterrupt:
+            # TODO: this doesn't seem to work as it does for sync connections
+            # see tests/test_concurrency_async.py::test_ctrl_c
+            # In the test, the code doesn't reach this branch.
+
+            # On Ctrl-C, try to cancel the query in the server, otherwise
+            # otherwise the connection will be stuck in ACTIVE state
+            c = self.pgconn.get_cancel()
+            c.cancel()
+            try:
+                await waiting.wait_async(gen, self.pgconn.socket)
+            except e.QueryCanceled:
+                pass  # as expected
+            raise
 
     @classmethod
     async def _wait_conn(cls, gen: PQGenConn[RV], timeout: Optional[int]) -> RV:
index 643c1f026a384a1d898ba4c4884cb91a496617b5..86e1fe410a7fa124a261f4638c685455fce2d397 100644 (file)
@@ -6,11 +6,13 @@ import os
 import sys
 import time
 import queue
-import pytest
+import signal
 import threading
 import subprocess as sp
 from typing import List
 
+import pytest
+
 import psycopg
 
 
@@ -201,3 +203,56 @@ def test_identify_closure(dsn):
     finally:
         conn.close()
         conn2.close()
+
+
+@pytest.mark.slow
+@pytest.mark.subprocess
+def test_ctrl_c(dsn):
+    if sys.platform == "win32":
+        sig = int(signal.CTRL_C_EVENT)
+        # Or pytest will receive the Ctrl-C too
+        creationflags = sp.CREATE_NEW_PROCESS_GROUP
+    else:
+        sig = int(signal.SIGINT)
+        creationflags = 0
+
+    script = f"""\
+import os
+import time
+import psycopg
+from threading import Thread
+
+def tired_of_life():
+    time.sleep(1)
+    os.kill(os.getpid(), {sig!r})
+
+t = Thread(target=tired_of_life, daemon=True)
+t.start()
+
+with psycopg.connect({dsn!r}) as conn:
+    cur = conn.cursor()
+    ctrl_c = False
+    try:
+        cur.execute("select pg_sleep(2)")
+    except KeyboardInterrupt:
+        ctrl_c = True
+
+    assert ctrl_c, "ctrl-c not received"
+    assert (
+        conn.info.transaction_status == psycopg.pq.TransactionStatus.INERROR
+    ), f"transaction status: {{conn.info.transaction_status!r}}"
+
+    conn.rollback()
+    assert (
+        conn.info.transaction_status == psycopg.pq.TransactionStatus.IDLE
+    ), f"transaction status: {{conn.info.transaction_status!r}}"
+
+    cur.execute("select 1")
+    assert cur.fetchone() == (1,)
+"""
+    t0 = time.time()
+    proc = sp.Popen([sys.executable, "-s", "-c", script], creationflags=creationflags)
+    proc.communicate()
+    t = time.time() - t0
+    assert proc.returncode == 0
+    assert 1 < t < 2
index 6a1df9684f1d01581b17e536039d31a12765ae08..adc95ce3e33d5c70993c754be1fbf154a77b8523 100644 (file)
@@ -1,9 +1,13 @@
+import sys
 import time
-import pytest
+import signal
 import asyncio
+import subprocess as sp
 from asyncio.queues import Queue
 from typing import List, Tuple
 
+import pytest
+
 import psycopg
 from psycopg._compat import create_task
 
@@ -151,3 +155,52 @@ async def test_identify_closure(dsn):
     finally:
         await aconn.close()
         await conn2.close()
+
+
+@pytest.mark.xfail(reason="fix #231 for async connection")
+@pytest.mark.slow
+@pytest.mark.subprocess
+async def test_ctrl_c(dsn):
+    script = f"""\
+import asyncio
+import psycopg
+
+ctrl_c = False
+
+async def main():
+    async with await psycopg.AsyncConnection.connect({dsn!r}) as conn:
+        cur = conn.cursor()
+        try:
+            await cur.execute("select pg_sleep(2)")
+        except KeyboardInterrupt:
+            ctrl_c = True
+
+        assert ctrl_c, "ctrl-c not received"
+        assert (
+            conn.info.transaction_status == psycopg.pq.TransactionStatus.INERROR
+        ), f"transaction status: {{conn.info.transaction_status!r}}"
+
+        await conn.rollback()
+        assert (
+            conn.info.transaction_status == psycopg.pq.TransactionStatus.IDLE
+        ), f"transaction status: {{conn.info.transaction_status!r}}"
+
+        await cur.execute("select 1")
+        assert (await cur.fetchone()) == (1,)
+
+asyncio.run(main())
+"""
+    if sys.platform == "win32":
+        creationflags = sp.CREATE_NEW_PROCESS_GROUP
+        sig = signal.CTRL_C_EVENT
+    else:
+        creationflags = 0
+        sig = signal.SIGINT
+
+    proc = sp.Popen([sys.executable, "-s", "-c", script], creationflags=creationflags)
+    with pytest.raises(sp.TimeoutExpired):
+        outs, errs = proc.communicate(timeout=1)
+
+    proc.send_signal(sig)
+    proc.communicate()
+    assert proc.returncode == 0