await conn2.close()
-@pytest.mark.xfail(reason="fix #231 for async connection")
@pytest.mark.slow
@pytest.mark.subprocess
+@pytest.mark.skipif(
+ sys.platform == "win32", reason="don't know how to Ctrl-C on Windows"
+)
async def test_ctrl_c(dsn):
script = f"""\
+import signal
import asyncio
import psycopg
ctrl_c = False
async def main():
+ loop = asyncio.get_event_loop()
async with await psycopg.AsyncConnection.connect({dsn!r}) as conn:
+ loop.add_signal_handler(signal.SIGINT, conn.cancel)
cur = conn.cursor()
try:
await cur.execute("select pg_sleep(2)")
- except KeyboardInterrupt:
+ except psycopg.errors.QueryCanceled:
ctrl_c = True
assert ctrl_c, "ctrl-c not received"